b77f054eb30c25ccbd09b7541979c3fe21f3f728
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 import logging
28 import queue
29 import threading
30 import time
31 import yaml
32 from copy import deepcopy
33 from http import HTTPStatus
34 from os import mkdir
35 from importlib_metadata import entry_points
36 from shutil import rmtree
37 from unittest.mock import Mock
38
39 # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
40 from osm_common.dbbase import DbException
41 from osm_ro_plugin.vim_dummy import VimDummyConnector
42 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
43 from osm_ro_plugin import vimconn, sdnconn
44 from osm_ng_ro.vim_admin import LockRenew
45
46
47 __author__ = "Alfonso Tierno"
48 __date__ = "$28-Sep-2017 12:07:15$"
49
50
51 def deep_get(target_dict, *args, **kwargs):
52 """
53 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
54 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
55 :param target_dict: dictionary to be read
56 :param args: list of keys to read from target_dict
57 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
58 :return: The wanted value if exist, None or default otherwise
59 """
60 for key in args:
61 if not isinstance(target_dict, dict) or key not in target_dict:
62 return kwargs.get("default")
63 target_dict = target_dict[key]
64 return target_dict
65
66
67 class NsWorkerException(Exception):
68 pass
69
70
71 class FailingConnector:
72 def __init__(self, error_msg):
73 self.error_msg = error_msg
74
75 for method in dir(vimconn.VimConnector):
76 if method[0] != "_":
77 setattr(
78 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
79 )
80
81 for method in dir(sdnconn.SdnConnectorBase):
82 if method[0] != "_":
83 setattr(
84 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
85 )
86
87
88 class NsWorkerExceptionNotFound(NsWorkerException):
89 pass
90
91
92 class VimInteractionBase:
93 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
94 It implements methods that does nothing and return ok"""
95
96 def __init__(self, db, my_vims, db_vims, logger):
97 self.db = db
98 self.logger = logger
99 self.my_vims = my_vims
100 self.db_vims = db_vims
101
102 def new(self, ro_task, task_index, task_depends):
103 return "BUILD", {}
104
105 def refresh(self, ro_task):
106 """skip calling VIM to get image, flavor status. Assumes ok"""
107 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
108 return "FAILED", {}
109
110 return "DONE", {}
111
112 def delete(self, ro_task, task_index):
113 """skip calling VIM to delete image. Assumes ok"""
114 return "DONE", {}
115
116 def exec(self, ro_task, task_index, task_depends):
117 return "DONE", None, None
118
119
120 class VimInteractionNet(VimInteractionBase):
121 def new(self, ro_task, task_index, task_depends):
122 vim_net_id = None
123 task = ro_task["tasks"][task_index]
124 task_id = task["task_id"]
125 created = False
126 created_items = {}
127 target_vim = self.my_vims[ro_task["target_id"]]
128 mgmtnet = False
129 mgmtnet_defined_in_vim = False
130
131 try:
132 # FIND
133 if task.get("find_params"):
134 # if management, get configuration of VIM
135 if task["find_params"].get("filter_dict"):
136 vim_filter = task["find_params"]["filter_dict"]
137 # management network
138 elif task["find_params"].get("mgmt"):
139 mgmtnet = True
140 if deep_get(
141 self.db_vims[ro_task["target_id"]],
142 "config",
143 "management_network_id",
144 ):
145 mgmtnet_defined_in_vim = True
146 vim_filter = {
147 "id": self.db_vims[ro_task["target_id"]]["config"][
148 "management_network_id"
149 ]
150 }
151 elif deep_get(
152 self.db_vims[ro_task["target_id"]],
153 "config",
154 "management_network_name",
155 ):
156 mgmtnet_defined_in_vim = True
157 vim_filter = {
158 "name": self.db_vims[ro_task["target_id"]]["config"][
159 "management_network_name"
160 ]
161 }
162 else:
163 vim_filter = {"name": task["find_params"]["name"]}
164 else:
165 raise NsWorkerExceptionNotFound(
166 "Invalid find_params for new_net {}".format(task["find_params"])
167 )
168
169 vim_nets = target_vim.get_network_list(vim_filter)
170 if not vim_nets and not task.get("params"):
171 # If there is mgmt-network in the descriptor,
172 # there is no mapping of that network to a VIM network in the descriptor,
173 # also there is no mapping in the "--config" parameter or at VIM creation;
174 # that mgmt-network will be created.
175 if mgmtnet and not mgmtnet_defined_in_vim:
176 net_name = (
177 vim_filter.get("name")
178 if vim_filter.get("name")
179 else vim_filter.get("id")[:16]
180 )
181 vim_net_id, created_items = target_vim.new_network(
182 net_name, None
183 )
184 self.logger.debug(
185 "Created mgmt network vim_net_id: {}".format(vim_net_id)
186 )
187 created = True
188 else:
189 raise NsWorkerExceptionNotFound(
190 "Network not found with this criteria: '{}'".format(
191 task.get("find_params")
192 )
193 )
194 elif len(vim_nets) > 1:
195 raise NsWorkerException(
196 "More than one network found with this criteria: '{}'".format(
197 task["find_params"]
198 )
199 )
200
201 if vim_nets:
202 vim_net_id = vim_nets[0]["id"]
203 else:
204 # CREATE
205 params = task["params"]
206 vim_net_id, created_items = target_vim.new_network(**params)
207 created = True
208
209 ro_vim_item_update = {
210 "vim_id": vim_net_id,
211 "vim_status": "BUILD",
212 "created": created,
213 "created_items": created_items,
214 "vim_details": None,
215 }
216 self.logger.debug(
217 "task={} {} new-net={} created={}".format(
218 task_id, ro_task["target_id"], vim_net_id, created
219 )
220 )
221
222 return "BUILD", ro_vim_item_update
223 except (vimconn.VimConnException, NsWorkerException) as e:
224 self.logger.error(
225 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
226 )
227 ro_vim_item_update = {
228 "vim_status": "VIM_ERROR",
229 "created": created,
230 "vim_details": str(e),
231 }
232
233 return "FAILED", ro_vim_item_update
234
235 def refresh(self, ro_task):
236 """Call VIM to get network status"""
237 ro_task_id = ro_task["_id"]
238 target_vim = self.my_vims[ro_task["target_id"]]
239 vim_id = ro_task["vim_info"]["vim_id"]
240 net_to_refresh_list = [vim_id]
241
242 try:
243 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
244 vim_info = vim_dict[vim_id]
245
246 if vim_info["status"] == "ACTIVE":
247 task_status = "DONE"
248 elif vim_info["status"] == "BUILD":
249 task_status = "BUILD"
250 else:
251 task_status = "FAILED"
252 except vimconn.VimConnException as e:
253 # Mark all tasks at VIM_ERROR status
254 self.logger.error(
255 "ro_task={} vim={} get-net={}: {}".format(
256 ro_task_id, ro_task["target_id"], vim_id, e
257 )
258 )
259 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
260 task_status = "FAILED"
261
262 ro_vim_item_update = {}
263 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
264 ro_vim_item_update["vim_status"] = vim_info["status"]
265
266 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
267 ro_vim_item_update["vim_name"] = vim_info.get("name")
268
269 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
270 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
271 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
272 elif vim_info["status"] == "DELETED":
273 ro_vim_item_update["vim_id"] = None
274 ro_vim_item_update["vim_details"] = "Deleted externally"
275 else:
276 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
277 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
278
279 if ro_vim_item_update:
280 self.logger.debug(
281 "ro_task={} {} get-net={}: status={} {}".format(
282 ro_task_id,
283 ro_task["target_id"],
284 vim_id,
285 ro_vim_item_update.get("vim_status"),
286 ro_vim_item_update.get("vim_details")
287 if ro_vim_item_update.get("vim_status") != "ACTIVE"
288 else "",
289 )
290 )
291
292 return task_status, ro_vim_item_update
293
294 def delete(self, ro_task, task_index):
295 task = ro_task["tasks"][task_index]
296 task_id = task["task_id"]
297 net_vim_id = ro_task["vim_info"]["vim_id"]
298 ro_vim_item_update_ok = {
299 "vim_status": "DELETED",
300 "created": False,
301 "vim_details": "DELETED",
302 "vim_id": None,
303 }
304
305 try:
306 if net_vim_id or ro_task["vim_info"]["created_items"]:
307 target_vim = self.my_vims[ro_task["target_id"]]
308 target_vim.delete_network(
309 net_vim_id, ro_task["vim_info"]["created_items"]
310 )
311 except vimconn.VimConnNotFoundException:
312 ro_vim_item_update_ok["vim_details"] = "already deleted"
313 except vimconn.VimConnException as e:
314 self.logger.error(
315 "ro_task={} vim={} del-net={}: {}".format(
316 ro_task["_id"], ro_task["target_id"], net_vim_id, e
317 )
318 )
319 ro_vim_item_update = {
320 "vim_status": "VIM_ERROR",
321 "vim_details": "Error while deleting: {}".format(e),
322 }
323
324 return "FAILED", ro_vim_item_update
325
326 self.logger.debug(
327 "task={} {} del-net={} {}".format(
328 task_id,
329 ro_task["target_id"],
330 net_vim_id,
331 ro_vim_item_update_ok.get("vim_details", ""),
332 )
333 )
334
335 return "DONE", ro_vim_item_update_ok
336
337
338 class VimInteractionVdu(VimInteractionBase):
339 max_retries_inject_ssh_key = 20 # 20 times
340 time_retries_inject_ssh_key = 30 # wevery 30 seconds
341
342 def new(self, ro_task, task_index, task_depends):
343 task = ro_task["tasks"][task_index]
344 task_id = task["task_id"]
345 created = False
346 created_items = {}
347 target_vim = self.my_vims[ro_task["target_id"]]
348
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353 net_list = params_copy["net_list"]
354
355 for net in net_list:
356 # change task_id into network_id
357 if "net_id" in net and net["net_id"].startswith("TASK-"):
358 network_id = task_depends[net["net_id"]]
359
360 if not network_id:
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net["net_id"])
364 )
365
366 net["net_id"] = network_id
367
368 if params_copy["image_id"].startswith("TASK-"):
369 params_copy["image_id"] = task_depends[params_copy["image_id"]]
370
371 if params_copy["flavor_id"].startswith("TASK-"):
372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
373
374 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
375 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
376
377 ro_vim_item_update = {
378 "vim_id": vim_vm_id,
379 "vim_status": "BUILD",
380 "created": created,
381 "created_items": created_items,
382 "vim_details": None,
383 "interfaces_vim_ids": interfaces,
384 "interfaces": [],
385 }
386 self.logger.debug(
387 "task={} {} new-vm={} created={}".format(
388 task_id, ro_task["target_id"], vim_vm_id, created
389 )
390 )
391
392 return "BUILD", ro_vim_item_update
393 except (vimconn.VimConnException, NsWorkerException) as e:
394 self.logger.error(
395 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
396 )
397 ro_vim_item_update = {
398 "vim_status": "VIM_ERROR",
399 "created": created,
400 "vim_details": str(e),
401 }
402
403 return "FAILED", ro_vim_item_update
404
405 def delete(self, ro_task, task_index):
406 task = ro_task["tasks"][task_index]
407 task_id = task["task_id"]
408 vm_vim_id = ro_task["vim_info"]["vim_id"]
409 ro_vim_item_update_ok = {
410 "vim_status": "DELETED",
411 "created": False,
412 "vim_details": "DELETED",
413 "vim_id": None,
414 }
415
416 try:
417 if vm_vim_id or ro_task["vim_info"]["created_items"]:
418 target_vim = self.my_vims[ro_task["target_id"]]
419 target_vim.delete_vminstance(
420 vm_vim_id, ro_task["vim_info"]["created_items"]
421 )
422 except vimconn.VimConnNotFoundException:
423 ro_vim_item_update_ok["vim_details"] = "already deleted"
424 except vimconn.VimConnException as e:
425 self.logger.error(
426 "ro_task={} vim={} del-vm={}: {}".format(
427 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
428 )
429 )
430 ro_vim_item_update = {
431 "vim_status": "VIM_ERROR",
432 "vim_details": "Error while deleting: {}".format(e),
433 }
434
435 return "FAILED", ro_vim_item_update
436
437 self.logger.debug(
438 "task={} {} del-vm={} {}".format(
439 task_id,
440 ro_task["target_id"],
441 vm_vim_id,
442 ro_vim_item_update_ok.get("vim_details", ""),
443 )
444 )
445
446 return "DONE", ro_vim_item_update_ok
447
448 def refresh(self, ro_task):
449 """Call VIM to get vm status"""
450 ro_task_id = ro_task["_id"]
451 target_vim = self.my_vims[ro_task["target_id"]]
452 vim_id = ro_task["vim_info"]["vim_id"]
453
454 if not vim_id:
455 return None, None
456
457 vm_to_refresh_list = [vim_id]
458 try:
459 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
460 vim_info = vim_dict[vim_id]
461
462 if vim_info["status"] == "ACTIVE":
463 task_status = "DONE"
464 elif vim_info["status"] == "BUILD":
465 task_status = "BUILD"
466 else:
467 task_status = "FAILED"
468
469 # try to load and parse vim_information
470 try:
471 vim_info_info = yaml.safe_load(vim_info["vim_info"])
472 if vim_info_info.get("name"):
473 vim_info["name"] = vim_info_info["name"]
474 except Exception:
475 pass
476 except vimconn.VimConnException as e:
477 # Mark all tasks at VIM_ERROR status
478 self.logger.error(
479 "ro_task={} vim={} get-vm={}: {}".format(
480 ro_task_id, ro_task["target_id"], vim_id, e
481 )
482 )
483 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
484 task_status = "FAILED"
485
486 ro_vim_item_update = {}
487
488 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
489 vim_interfaces = []
490 if vim_info.get("interfaces"):
491 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
492 iface = next(
493 (
494 iface
495 for iface in vim_info["interfaces"]
496 if vim_iface_id == iface["vim_interface_id"]
497 ),
498 None,
499 )
500 # if iface:
501 # iface.pop("vim_info", None)
502 vim_interfaces.append(iface)
503
504 task_create = next(
505 t
506 for t in ro_task["tasks"]
507 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
508 )
509 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
510 vim_interfaces[task_create["mgmt_vnf_interface"]][
511 "mgmt_vnf_interface"
512 ] = True
513
514 mgmt_vdu_iface = task_create.get(
515 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
516 )
517 if vim_interfaces:
518 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
519
520 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
521 ro_vim_item_update["interfaces"] = vim_interfaces
522
523 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
524 ro_vim_item_update["vim_status"] = vim_info["status"]
525
526 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
527 ro_vim_item_update["vim_name"] = vim_info.get("name")
528
529 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
530 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
531 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
532 elif vim_info["status"] == "DELETED":
533 ro_vim_item_update["vim_id"] = None
534 ro_vim_item_update["vim_details"] = "Deleted externally"
535 else:
536 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
537 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
538
539 if ro_vim_item_update:
540 self.logger.debug(
541 "ro_task={} {} get-vm={}: status={} {}".format(
542 ro_task_id,
543 ro_task["target_id"],
544 vim_id,
545 ro_vim_item_update.get("vim_status"),
546 ro_vim_item_update.get("vim_details")
547 if ro_vim_item_update.get("vim_status") != "ACTIVE"
548 else "",
549 )
550 )
551
552 return task_status, ro_vim_item_update
553
554 def exec(self, ro_task, task_index, task_depends):
555 task = ro_task["tasks"][task_index]
556 task_id = task["task_id"]
557 target_vim = self.my_vims[ro_task["target_id"]]
558 db_task_update = {"retries": 0}
559 retries = task.get("retries", 0)
560
561 try:
562 params = task["params"]
563 params_copy = deepcopy(params)
564 params_copy["ro_key"] = self.db.decrypt(
565 params_copy.pop("private_key"),
566 params_copy.pop("schema_version"),
567 params_copy.pop("salt"),
568 )
569 params_copy["ip_addr"] = params_copy.pop("ip_address")
570 target_vim.inject_user_key(**params_copy)
571 self.logger.debug(
572 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
573 )
574
575 return (
576 "DONE",
577 None,
578 db_task_update,
579 ) # params_copy["key"]
580 except (vimconn.VimConnException, NsWorkerException) as e:
581 retries += 1
582
583 if retries < self.max_retries_inject_ssh_key:
584 return (
585 "BUILD",
586 None,
587 {
588 "retries": retries,
589 "next_retry": self.time_retries_inject_ssh_key,
590 },
591 )
592
593 self.logger.error(
594 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
595 )
596 ro_vim_item_update = {"vim_details": str(e)}
597
598 return "FAILED", ro_vim_item_update, db_task_update
599
600
601 class VimInteractionImage(VimInteractionBase):
602 def new(self, ro_task, task_index, task_depends):
603 task = ro_task["tasks"][task_index]
604 task_id = task["task_id"]
605 created = False
606 created_items = {}
607 target_vim = self.my_vims[ro_task["target_id"]]
608
609 try:
610 # FIND
611 if task.get("find_params"):
612 vim_images = target_vim.get_image_list(**task["find_params"])
613
614 if not vim_images:
615 raise NsWorkerExceptionNotFound(
616 "Image not found with this criteria: '{}'".format(
617 task["find_params"]
618 )
619 )
620 elif len(vim_images) > 1:
621 raise NsWorkerException(
622 "More than one image found with this criteria: '{}'".format(
623 task["find_params"]
624 )
625 )
626 else:
627 vim_image_id = vim_images[0]["id"]
628
629 ro_vim_item_update = {
630 "vim_id": vim_image_id,
631 "vim_status": "DONE",
632 "created": created,
633 "created_items": created_items,
634 "vim_details": None,
635 }
636 self.logger.debug(
637 "task={} {} new-image={} created={}".format(
638 task_id, ro_task["target_id"], vim_image_id, created
639 )
640 )
641
642 return "DONE", ro_vim_item_update
643 except (NsWorkerException, vimconn.VimConnException) as e:
644 self.logger.error(
645 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
646 )
647 ro_vim_item_update = {
648 "vim_status": "VIM_ERROR",
649 "created": created,
650 "vim_details": str(e),
651 }
652
653 return "FAILED", ro_vim_item_update
654
655
656 class VimInteractionFlavor(VimInteractionBase):
657 def delete(self, ro_task, task_index):
658 task = ro_task["tasks"][task_index]
659 task_id = task["task_id"]
660 flavor_vim_id = ro_task["vim_info"]["vim_id"]
661 ro_vim_item_update_ok = {
662 "vim_status": "DELETED",
663 "created": False,
664 "vim_details": "DELETED",
665 "vim_id": None,
666 }
667
668 try:
669 if flavor_vim_id:
670 target_vim = self.my_vims[ro_task["target_id"]]
671 target_vim.delete_flavor(flavor_vim_id)
672 except vimconn.VimConnNotFoundException:
673 ro_vim_item_update_ok["vim_details"] = "already deleted"
674 except vimconn.VimConnException as e:
675 self.logger.error(
676 "ro_task={} vim={} del-flavor={}: {}".format(
677 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
678 )
679 )
680 ro_vim_item_update = {
681 "vim_status": "VIM_ERROR",
682 "vim_details": "Error while deleting: {}".format(e),
683 }
684
685 return "FAILED", ro_vim_item_update
686
687 self.logger.debug(
688 "task={} {} del-flavor={} {}".format(
689 task_id,
690 ro_task["target_id"],
691 flavor_vim_id,
692 ro_vim_item_update_ok.get("vim_details", ""),
693 )
694 )
695
696 return "DONE", ro_vim_item_update_ok
697
698 def new(self, ro_task, task_index, task_depends):
699 task = ro_task["tasks"][task_index]
700 task_id = task["task_id"]
701 created = False
702 created_items = {}
703 target_vim = self.my_vims[ro_task["target_id"]]
704
705 try:
706 # FIND
707 vim_flavor_id = None
708
709 if task.get("find_params"):
710 try:
711 flavor_data = task["find_params"]["flavor_data"]
712 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
713 except vimconn.VimConnNotFoundException:
714 pass
715
716 if not vim_flavor_id and task.get("params"):
717 # CREATE
718 flavor_data = task["params"]["flavor_data"]
719 vim_flavor_id = target_vim.new_flavor(flavor_data)
720 created = True
721
722 ro_vim_item_update = {
723 "vim_id": vim_flavor_id,
724 "vim_status": "DONE",
725 "created": created,
726 "created_items": created_items,
727 "vim_details": None,
728 }
729 self.logger.debug(
730 "task={} {} new-flavor={} created={}".format(
731 task_id, ro_task["target_id"], vim_flavor_id, created
732 )
733 )
734
735 return "DONE", ro_vim_item_update
736 except (vimconn.VimConnException, NsWorkerException) as e:
737 self.logger.error(
738 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
739 )
740 ro_vim_item_update = {
741 "vim_status": "VIM_ERROR",
742 "created": created,
743 "vim_details": str(e),
744 }
745
746 return "FAILED", ro_vim_item_update
747
748
749 class VimInteractionSdnNet(VimInteractionBase):
750 @staticmethod
751 def _match_pci(port_pci, mapping):
752 """
753 Check if port_pci matches with mapping
754 mapping can have brackets to indicate that several chars are accepted. e.g
755 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
756 :param port_pci: text
757 :param mapping: text, can contain brackets to indicate several chars are available
758 :return: True if matches, False otherwise
759 """
760 if not port_pci or not mapping:
761 return False
762 if port_pci == mapping:
763 return True
764
765 mapping_index = 0
766 pci_index = 0
767 while True:
768 bracket_start = mapping.find("[", mapping_index)
769
770 if bracket_start == -1:
771 break
772
773 bracket_end = mapping.find("]", bracket_start)
774 if bracket_end == -1:
775 break
776
777 length = bracket_start - mapping_index
778 if (
779 length
780 and port_pci[pci_index : pci_index + length]
781 != mapping[mapping_index:bracket_start]
782 ):
783 return False
784
785 if (
786 port_pci[pci_index + length]
787 not in mapping[bracket_start + 1 : bracket_end]
788 ):
789 return False
790
791 pci_index += length + 1
792 mapping_index = bracket_end + 1
793
794 if port_pci[pci_index:] != mapping[mapping_index:]:
795 return False
796
797 return True
798
799 def _get_interfaces(self, vlds_to_connect, vim_account_id):
800 """
801 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
802 :param vim_account_id:
803 :return:
804 """
805 interfaces = []
806
807 for vld in vlds_to_connect:
808 table, _, db_id = vld.partition(":")
809 db_id, _, vld = db_id.partition(":")
810 _, _, vld_id = vld.partition(".")
811
812 if table == "vnfrs":
813 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
814 iface_key = "vnf-vld-id"
815 else: # table == "nsrs"
816 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
817 iface_key = "ns-vld-id"
818
819 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
820
821 for db_vnfr in db_vnfrs:
822 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
823 for iface_index, interface in enumerate(vdur["interfaces"]):
824 if interface.get(iface_key) == vld_id and interface.get(
825 "type"
826 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
827 # only SR-IOV o PT
828 interface_ = interface.copy()
829 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
830 db_vnfr["_id"], vdu_index, iface_index
831 )
832
833 if vdur.get("status") == "ERROR":
834 interface_["status"] = "ERROR"
835
836 interfaces.append(interface_)
837
838 return interfaces
839
840 def refresh(self, ro_task):
841 # look for task create
842 task_create_index, _ = next(
843 i_t
844 for i_t in enumerate(ro_task["tasks"])
845 if i_t[1]
846 and i_t[1]["action"] == "CREATE"
847 and i_t[1]["status"] != "FINISHED"
848 )
849
850 return self.new(ro_task, task_create_index, None)
851
852 def new(self, ro_task, task_index, task_depends):
853
854 task = ro_task["tasks"][task_index]
855 task_id = task["task_id"]
856 target_vim = self.my_vims[ro_task["target_id"]]
857
858 sdn_net_id = ro_task["vim_info"]["vim_id"]
859
860 created_items = ro_task["vim_info"].get("created_items")
861 connected_ports = ro_task["vim_info"].get("connected_ports", [])
862 new_connected_ports = []
863 last_update = ro_task["vim_info"].get("last_update", 0)
864 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
865 error_list = []
866 created = ro_task["vim_info"].get("created", False)
867
868 try:
869 # CREATE
870 params = task["params"]
871 vlds_to_connect = params["vlds"]
872 associated_vim = params["target_vim"]
873 # external additional ports
874 additional_ports = params.get("sdn-ports") or ()
875 _, _, vim_account_id = associated_vim.partition(":")
876
877 if associated_vim:
878 # get associated VIM
879 if associated_vim not in self.db_vims:
880 self.db_vims[associated_vim] = self.db.get_one(
881 "vim_accounts", {"_id": vim_account_id}
882 )
883
884 db_vim = self.db_vims[associated_vim]
885
886 # look for ports to connect
887 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
888 # print(ports)
889
890 sdn_ports = []
891 pending_ports = error_ports = 0
892 vlan_used = None
893 sdn_need_update = False
894
895 for port in ports:
896 vlan_used = port.get("vlan") or vlan_used
897
898 # TODO. Do not connect if already done
899 if not port.get("compute_node") or not port.get("pci"):
900 if port.get("status") == "ERROR":
901 error_ports += 1
902 else:
903 pending_ports += 1
904 continue
905
906 pmap = None
907 compute_node_mappings = next(
908 (
909 c
910 for c in db_vim["config"].get("sdn-port-mapping", ())
911 if c and c["compute_node"] == port["compute_node"]
912 ),
913 None,
914 )
915
916 if compute_node_mappings:
917 # process port_mapping pci of type 0000:af:1[01].[1357]
918 pmap = next(
919 (
920 p
921 for p in compute_node_mappings["ports"]
922 if self._match_pci(port["pci"], p.get("pci"))
923 ),
924 None,
925 )
926
927 if not pmap:
928 if not db_vim["config"].get("mapping_not_needed"):
929 error_list.append(
930 "Port mapping not found for compute_node={} pci={}".format(
931 port["compute_node"], port["pci"]
932 )
933 )
934 continue
935
936 pmap = {}
937
938 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
939 new_port = {
940 "service_endpoint_id": pmap.get("service_endpoint_id")
941 or service_endpoint_id,
942 "service_endpoint_encapsulation_type": "dot1q"
943 if port["type"] == "SR-IOV"
944 else None,
945 "service_endpoint_encapsulation_info": {
946 "vlan": port.get("vlan"),
947 "mac": port.get("mac-address"),
948 "device_id": pmap.get("device_id") or port["compute_node"],
949 "device_interface_id": pmap.get("device_interface_id")
950 or port["pci"],
951 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
952 "switch_port": pmap.get("switch_port"),
953 "service_mapping_info": pmap.get("service_mapping_info"),
954 },
955 }
956
957 # TODO
958 # if port["modified_at"] > last_update:
959 # sdn_need_update = True
960 new_connected_ports.append(port["id"]) # TODO
961 sdn_ports.append(new_port)
962
963 if error_ports:
964 error_list.append(
965 "{} interfaces have not been created as VDU is on ERROR status".format(
966 error_ports
967 )
968 )
969
970 # connect external ports
971 for index, additional_port in enumerate(additional_ports):
972 additional_port_id = additional_port.get(
973 "service_endpoint_id"
974 ) or "external-{}".format(index)
975 sdn_ports.append(
976 {
977 "service_endpoint_id": additional_port_id,
978 "service_endpoint_encapsulation_type": additional_port.get(
979 "service_endpoint_encapsulation_type", "dot1q"
980 ),
981 "service_endpoint_encapsulation_info": {
982 "vlan": additional_port.get("vlan") or vlan_used,
983 "mac": additional_port.get("mac_address"),
984 "device_id": additional_port.get("device_id"),
985 "device_interface_id": additional_port.get(
986 "device_interface_id"
987 ),
988 "switch_dpid": additional_port.get("switch_dpid")
989 or additional_port.get("switch_id"),
990 "switch_port": additional_port.get("switch_port"),
991 "service_mapping_info": additional_port.get(
992 "service_mapping_info"
993 ),
994 },
995 }
996 )
997 new_connected_ports.append(additional_port_id)
998 sdn_info = ""
999
1000 # if there are more ports to connect or they have been modified, call create/update
1001 if error_list:
1002 sdn_status = "ERROR"
1003 sdn_info = "; ".join(error_list)
1004 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1005 last_update = time.time()
1006
1007 if not sdn_net_id:
1008 if len(sdn_ports) < 2:
1009 sdn_status = "ACTIVE"
1010
1011 if not pending_ports:
1012 self.logger.debug(
1013 "task={} {} new-sdn-net done, less than 2 ports".format(
1014 task_id, ro_task["target_id"]
1015 )
1016 )
1017 else:
1018 net_type = params.get("type") or "ELAN"
1019 (
1020 sdn_net_id,
1021 created_items,
1022 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1023 created = True
1024 self.logger.debug(
1025 "task={} {} new-sdn-net={} created={}".format(
1026 task_id, ro_task["target_id"], sdn_net_id, created
1027 )
1028 )
1029 else:
1030 created_items = target_vim.edit_connectivity_service(
1031 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1032 )
1033 created = True
1034 self.logger.debug(
1035 "task={} {} update-sdn-net={} created={}".format(
1036 task_id, ro_task["target_id"], sdn_net_id, created
1037 )
1038 )
1039
1040 connected_ports = new_connected_ports
1041 elif sdn_net_id:
1042 wim_status_dict = target_vim.get_connectivity_service_status(
1043 sdn_net_id, conn_info=created_items
1044 )
1045 sdn_status = wim_status_dict["sdn_status"]
1046
1047 if wim_status_dict.get("sdn_info"):
1048 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1049
1050 if wim_status_dict.get("error_msg"):
1051 sdn_info = wim_status_dict.get("error_msg") or ""
1052
1053 if pending_ports:
1054 if sdn_status != "ERROR":
1055 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1056 len(ports) - pending_ports, len(ports)
1057 )
1058
1059 if sdn_status == "ACTIVE":
1060 sdn_status = "BUILD"
1061
1062 ro_vim_item_update = {
1063 "vim_id": sdn_net_id,
1064 "vim_status": sdn_status,
1065 "created": created,
1066 "created_items": created_items,
1067 "connected_ports": connected_ports,
1068 "vim_details": sdn_info,
1069 "last_update": last_update,
1070 }
1071
1072 return sdn_status, ro_vim_item_update
1073 except Exception as e:
1074 self.logger.error(
1075 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1076 exc_info=not isinstance(
1077 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1078 ),
1079 )
1080 ro_vim_item_update = {
1081 "vim_status": "VIM_ERROR",
1082 "created": created,
1083 "vim_details": str(e),
1084 }
1085
1086 return "FAILED", ro_vim_item_update
1087
1088 def delete(self, ro_task, task_index):
1089 task = ro_task["tasks"][task_index]
1090 task_id = task["task_id"]
1091 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1092 ro_vim_item_update_ok = {
1093 "vim_status": "DELETED",
1094 "created": False,
1095 "vim_details": "DELETED",
1096 "vim_id": None,
1097 }
1098
1099 try:
1100 if sdn_vim_id:
1101 target_vim = self.my_vims[ro_task["target_id"]]
1102 target_vim.delete_connectivity_service(
1103 sdn_vim_id, ro_task["vim_info"].get("created_items")
1104 )
1105
1106 except Exception as e:
1107 if (
1108 isinstance(e, sdnconn.SdnConnectorError)
1109 and e.http_code == HTTPStatus.NOT_FOUND.value
1110 ):
1111 ro_vim_item_update_ok["vim_details"] = "already deleted"
1112 else:
1113 self.logger.error(
1114 "ro_task={} vim={} del-sdn-net={}: {}".format(
1115 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1116 ),
1117 exc_info=not isinstance(
1118 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1119 ),
1120 )
1121 ro_vim_item_update = {
1122 "vim_status": "VIM_ERROR",
1123 "vim_details": "Error while deleting: {}".format(e),
1124 }
1125
1126 return "FAILED", ro_vim_item_update
1127
1128 self.logger.debug(
1129 "task={} {} del-sdn-net={} {}".format(
1130 task_id,
1131 ro_task["target_id"],
1132 sdn_vim_id,
1133 ro_vim_item_update_ok.get("vim_details", ""),
1134 )
1135 )
1136
1137 return "DONE", ro_vim_item_update_ok
1138
1139
1140 class NsWorker(threading.Thread):
1141 REFRESH_BUILD = 5 # 5 seconds
1142 REFRESH_ACTIVE = 60 # 1 minute
1143 REFRESH_ERROR = 600
1144 REFRESH_IMAGE = 3600 * 10
1145 REFRESH_DELETE = 3600 * 10
1146 QUEUE_SIZE = 100
1147 terminate = False
1148
1149 def __init__(self, worker_index, config, plugins, db):
1150 """
1151
1152 :param worker_index: thread index
1153 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1154 :param plugins: global shared dict with the loaded plugins
1155 :param db: database class instance to use
1156 """
1157 threading.Thread.__init__(self)
1158 self.config = config
1159 self.plugins = plugins
1160 self.plugin_name = "unknown"
1161 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1162 self.worker_index = worker_index
1163 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1164 # targetvim: vimplugin class
1165 self.my_vims = {}
1166 # targetvim: vim information from database
1167 self.db_vims = {}
1168 # targetvim list
1169 self.vim_targets = []
1170 self.my_id = config["process_id"] + ":" + str(worker_index)
1171 self.db = db
1172 self.item2class = {
1173 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1174 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1175 "image": VimInteractionImage(
1176 self.db, self.my_vims, self.db_vims, self.logger
1177 ),
1178 "flavor": VimInteractionFlavor(
1179 self.db, self.my_vims, self.db_vims, self.logger
1180 ),
1181 "sdn_net": VimInteractionSdnNet(
1182 self.db, self.my_vims, self.db_vims, self.logger
1183 ),
1184 }
1185 self.time_last_task_processed = None
1186 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1187 self.tasks_to_delete = []
1188 # it is idle when there are not vim_targets associated
1189 self.idle = True
1190 self.task_locked_time = config["global"]["task_locked_time"]
1191
1192 def insert_task(self, task):
1193 try:
1194 self.task_queue.put(task, False)
1195 return None
1196 except queue.Full:
1197 raise NsWorkerException("timeout inserting a task")
1198
1199 def terminate(self):
1200 self.insert_task("exit")
1201
1202 def del_task(self, task):
1203 with self.task_lock:
1204 if task["status"] == "SCHEDULED":
1205 task["status"] = "SUPERSEDED"
1206 return True
1207 else: # task["status"] == "processing"
1208 self.task_lock.release()
1209 return False
1210
1211 def _process_vim_config(self, target_id, db_vim):
1212 """
1213 Process vim config, creating vim configuration files as ca_cert
1214 :param target_id: vim/sdn/wim + id
1215 :param db_vim: Vim dictionary obtained from database
1216 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1217 """
1218 if not db_vim.get("config"):
1219 return
1220
1221 file_name = ""
1222
1223 try:
1224 if db_vim["config"].get("ca_cert_content"):
1225 file_name = "{}:{}".format(target_id, self.worker_index)
1226
1227 try:
1228 mkdir(file_name)
1229 except FileExistsError:
1230 pass
1231
1232 file_name = file_name + "/ca_cert"
1233
1234 with open(file_name, "w") as f:
1235 f.write(db_vim["config"]["ca_cert_content"])
1236 del db_vim["config"]["ca_cert_content"]
1237 db_vim["config"]["ca_cert"] = file_name
1238 except Exception as e:
1239 raise NsWorkerException(
1240 "Error writing to file '{}': {}".format(file_name, e)
1241 )
1242
1243 def _load_plugin(self, name, type="vim"):
1244 # type can be vim or sdn
1245 if "rovim_dummy" not in self.plugins:
1246 self.plugins["rovim_dummy"] = VimDummyConnector
1247
1248 if "rosdn_dummy" not in self.plugins:
1249 self.plugins["rosdn_dummy"] = SdnDummyConnector
1250
1251 if name in self.plugins:
1252 return self.plugins[name]
1253
1254 try:
1255 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1256 self.plugins[name] = ep.load()
1257 except Exception as e:
1258 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1259
1260 if name and name not in self.plugins:
1261 raise NsWorkerException(
1262 "Plugin 'osm_{n}' has not been installed".format(n=name)
1263 )
1264
1265 return self.plugins[name]
1266
1267 def _unload_vim(self, target_id):
1268 """
1269 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1270 :param target_id: Contains type:_id; where type can be 'vim', ...
1271 :return: None.
1272 """
1273 try:
1274 self.db_vims.pop(target_id, None)
1275 self.my_vims.pop(target_id, None)
1276
1277 if target_id in self.vim_targets:
1278 self.vim_targets.remove(target_id)
1279
1280 self.logger.info("Unloaded {}".format(target_id))
1281 rmtree("{}:{}".format(target_id, self.worker_index))
1282 except FileNotFoundError:
1283 pass # this is raised by rmtree if folder does not exist
1284 except Exception as e:
1285 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1286
1287 def _check_vim(self, target_id):
1288 """
1289 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1290 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1291 :return: None.
1292 """
1293 target, _, _id = target_id.partition(":")
1294 now = time.time()
1295 update_dict = {}
1296 unset_dict = {}
1297 op_text = ""
1298 step = ""
1299 loaded = target_id in self.vim_targets
1300 target_database = (
1301 "vim_accounts"
1302 if target == "vim"
1303 else "wim_accounts"
1304 if target == "wim"
1305 else "sdns"
1306 )
1307
1308 try:
1309 step = "Getting {} from db".format(target_id)
1310 db_vim = self.db.get_one(target_database, {"_id": _id})
1311
1312 for op_index, operation in enumerate(
1313 db_vim["_admin"].get("operations", ())
1314 ):
1315 if operation["operationState"] != "PROCESSING":
1316 continue
1317
1318 locked_at = operation.get("locked_at")
1319
1320 if locked_at is not None and locked_at >= now - self.task_locked_time:
1321 # some other thread is doing this operation
1322 return
1323
1324 # lock
1325 op_text = "_admin.operations.{}.".format(op_index)
1326
1327 if not self.db.set_one(
1328 target_database,
1329 q_filter={
1330 "_id": _id,
1331 op_text + "operationState": "PROCESSING",
1332 op_text + "locked_at": locked_at,
1333 },
1334 update_dict={
1335 op_text + "locked_at": now,
1336 "admin.current_operation": op_index,
1337 },
1338 fail_on_empty=False,
1339 ):
1340 return
1341
1342 unset_dict[op_text + "locked_at"] = None
1343 unset_dict["current_operation"] = None
1344 step = "Loading " + target_id
1345 error_text = self._load_vim(target_id)
1346
1347 if not error_text:
1348 step = "Checking connectivity"
1349
1350 if target == "vim":
1351 self.my_vims[target_id].check_vim_connectivity()
1352 else:
1353 self.my_vims[target_id].check_credentials()
1354
1355 update_dict["_admin.operationalState"] = "ENABLED"
1356 update_dict["_admin.detailed-status"] = ""
1357 unset_dict[op_text + "detailed-status"] = None
1358 update_dict[op_text + "operationState"] = "COMPLETED"
1359
1360 return
1361
1362 except Exception as e:
1363 error_text = "{}: {}".format(step, e)
1364 self.logger.error("{} for {}: {}".format(step, target_id, e))
1365
1366 finally:
1367 if update_dict or unset_dict:
1368 if error_text:
1369 update_dict[op_text + "operationState"] = "FAILED"
1370 update_dict[op_text + "detailed-status"] = error_text
1371 unset_dict.pop(op_text + "detailed-status", None)
1372 update_dict["_admin.operationalState"] = "ERROR"
1373 update_dict["_admin.detailed-status"] = error_text
1374
1375 if op_text:
1376 update_dict[op_text + "statusEnteredTime"] = now
1377
1378 self.db.set_one(
1379 target_database,
1380 q_filter={"_id": _id},
1381 update_dict=update_dict,
1382 unset=unset_dict,
1383 fail_on_empty=False,
1384 )
1385
1386 if not loaded:
1387 self._unload_vim(target_id)
1388
1389 def _reload_vim(self, target_id):
1390 if target_id in self.vim_targets:
1391 self._load_vim(target_id)
1392 else:
1393 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1394 # just remove it to force load again next time it is needed
1395 self.db_vims.pop(target_id, None)
1396
1397 def _load_vim(self, target_id):
1398 """
1399 Load or reload a vim_account, sdn_controller or wim_account.
1400 Read content from database, load the plugin if not loaded.
1401 In case of error loading the plugin, it load a failing VIM_connector
1402 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1403 :param target_id: Contains type:_id; where type can be 'vim', ...
1404 :return: None if ok, descriptive text if error
1405 """
1406 target, _, _id = target_id.partition(":")
1407 target_database = (
1408 "vim_accounts"
1409 if target == "vim"
1410 else "wim_accounts"
1411 if target == "wim"
1412 else "sdns"
1413 )
1414 plugin_name = ""
1415 vim = None
1416
1417 try:
1418 step = "Getting {}={} from db".format(target, _id)
1419 # TODO process for wim, sdnc, ...
1420 vim = self.db.get_one(target_database, {"_id": _id})
1421
1422 # if deep_get(vim, "config", "sdn-controller"):
1423 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1424 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1425
1426 step = "Decrypting password"
1427 schema_version = vim.get("schema_version")
1428 self.db.encrypt_decrypt_fields(
1429 vim,
1430 "decrypt",
1431 fields=("password", "secret"),
1432 schema_version=schema_version,
1433 salt=_id,
1434 )
1435 self._process_vim_config(target_id, vim)
1436
1437 if target == "vim":
1438 plugin_name = "rovim_" + vim["vim_type"]
1439 step = "Loading plugin '{}'".format(plugin_name)
1440 vim_module_conn = self._load_plugin(plugin_name)
1441 step = "Loading {}'".format(target_id)
1442 self.my_vims[target_id] = vim_module_conn(
1443 uuid=vim["_id"],
1444 name=vim["name"],
1445 tenant_id=vim.get("vim_tenant_id"),
1446 tenant_name=vim.get("vim_tenant_name"),
1447 url=vim["vim_url"],
1448 url_admin=None,
1449 user=vim["vim_user"],
1450 passwd=vim["vim_password"],
1451 config=vim.get("config") or {},
1452 persistent_info={},
1453 )
1454 else: # sdn
1455 plugin_name = "rosdn_" + vim["type"]
1456 step = "Loading plugin '{}'".format(plugin_name)
1457 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1458 step = "Loading {}'".format(target_id)
1459 wim = deepcopy(vim)
1460 wim_config = wim.pop("config", {}) or {}
1461 wim["uuid"] = wim["_id"]
1462 wim["wim_url"] = wim["url"]
1463
1464 if wim.get("dpid"):
1465 wim_config["dpid"] = wim.pop("dpid")
1466
1467 if wim.get("switch_id"):
1468 wim_config["switch_id"] = wim.pop("switch_id")
1469
1470 # wim, wim_account, config
1471 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1472 self.db_vims[target_id] = vim
1473 self.error_status = None
1474
1475 self.logger.info(
1476 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1477 )
1478 except Exception as e:
1479 self.logger.error(
1480 "Cannot load {} plugin={}: {} {}".format(
1481 target_id, plugin_name, step, e
1482 )
1483 )
1484
1485 self.db_vims[target_id] = vim or {}
1486 self.db_vims[target_id] = FailingConnector(str(e))
1487 error_status = "{} Error: {}".format(step, e)
1488
1489 return error_status
1490 finally:
1491 if target_id not in self.vim_targets:
1492 self.vim_targets.append(target_id)
1493
1494 def _get_db_task(self):
1495 """
1496 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1497 :return: None
1498 """
1499 now = time.time()
1500
1501 if not self.time_last_task_processed:
1502 self.time_last_task_processed = now
1503
1504 try:
1505 while True:
1506 locked = self.db.set_one(
1507 "ro_tasks",
1508 q_filter={
1509 "target_id": self.vim_targets,
1510 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1511 "locked_at.lt": now - self.task_locked_time,
1512 "to_check_at.lt": self.time_last_task_processed,
1513 },
1514 update_dict={"locked_by": self.my_id, "locked_at": now},
1515 fail_on_empty=False,
1516 )
1517
1518 if locked:
1519 # read and return
1520 ro_task = self.db.get_one(
1521 "ro_tasks",
1522 q_filter={
1523 "target_id": self.vim_targets,
1524 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1525 "locked_at": now,
1526 },
1527 )
1528 return ro_task
1529
1530 if self.time_last_task_processed == now:
1531 self.time_last_task_processed = None
1532 return None
1533 else:
1534 self.time_last_task_processed = now
1535 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1536
1537 except DbException as e:
1538 self.logger.error("Database exception at _get_db_task: {}".format(e))
1539 except Exception as e:
1540 self.logger.critical(
1541 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1542 )
1543
1544 return None
1545
1546 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1547 """
1548 Determine if this task need to be done or superseded
1549 :return: None
1550 """
1551 my_task = ro_task["tasks"][task_index]
1552 task_id = my_task["task_id"]
1553 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1554 "created_items", False
1555 )
1556
1557 if my_task["status"] == "FAILED":
1558 return None, None # TODO need to be retry??
1559
1560 try:
1561 for index, task in enumerate(ro_task["tasks"]):
1562 if index == task_index or not task:
1563 continue # own task
1564
1565 if (
1566 my_task["target_record"] == task["target_record"]
1567 and task["action"] == "CREATE"
1568 ):
1569 # set to finished
1570 db_update["tasks.{}.status".format(index)] = task[
1571 "status"
1572 ] = "FINISHED"
1573 elif task["action"] == "CREATE" and task["status"] not in (
1574 "FINISHED",
1575 "SUPERSEDED",
1576 ):
1577 needed_delete = False
1578
1579 if needed_delete:
1580 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1581 else:
1582 return "SUPERSEDED", None
1583 except Exception as e:
1584 if not isinstance(e, NsWorkerException):
1585 self.logger.critical(
1586 "Unexpected exception at _delete_task task={}: {}".format(
1587 task_id, e
1588 ),
1589 exc_info=True,
1590 )
1591
1592 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1593
1594 def _create_task(self, ro_task, task_index, task_depends, db_update):
1595 """
1596 Determine if this task need to create something at VIM
1597 :return: None
1598 """
1599 my_task = ro_task["tasks"][task_index]
1600 task_id = my_task["task_id"]
1601 task_status = None
1602
1603 if my_task["status"] == "FAILED":
1604 return None, None # TODO need to be retry??
1605 elif my_task["status"] == "SCHEDULED":
1606 # check if already created by another task
1607 for index, task in enumerate(ro_task["tasks"]):
1608 if index == task_index or not task:
1609 continue # own task
1610
1611 if task["action"] == "CREATE" and task["status"] not in (
1612 "SCHEDULED",
1613 "FINISHED",
1614 "SUPERSEDED",
1615 ):
1616 return task["status"], "COPY_VIM_INFO"
1617
1618 try:
1619 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1620 ro_task, task_index, task_depends
1621 )
1622 # TODO update other CREATE tasks
1623 except Exception as e:
1624 if not isinstance(e, NsWorkerException):
1625 self.logger.error(
1626 "Error executing task={}: {}".format(task_id, e), exc_info=True
1627 )
1628
1629 task_status = "FAILED"
1630 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1631 # TODO update ro_vim_item_update
1632
1633 return task_status, ro_vim_item_update
1634 else:
1635 return None, None
1636
1637 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1638 """
1639 Look for dependency task
1640 :param task_id: Can be one of
1641 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1642 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1643 3. task.task_id: "<action_id>:number"
1644 :param ro_task:
1645 :param target_id:
1646 :return: database ro_task plus index of task
1647 """
1648 if (
1649 task_id.startswith("vim:")
1650 or task_id.startswith("sdn:")
1651 or task_id.startswith("wim:")
1652 ):
1653 target_id, _, task_id = task_id.partition(" ")
1654
1655 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1656 ro_task_dependency = self.db.get_one(
1657 "ro_tasks",
1658 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1659 fail_on_empty=False,
1660 )
1661
1662 if ro_task_dependency:
1663 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1664 if task["target_record_id"] == task_id:
1665 return ro_task_dependency, task_index
1666
1667 else:
1668 if ro_task:
1669 for task_index, task in enumerate(ro_task["tasks"]):
1670 if task and task["task_id"] == task_id:
1671 return ro_task, task_index
1672
1673 ro_task_dependency = self.db.get_one(
1674 "ro_tasks",
1675 q_filter={
1676 "tasks.ANYINDEX.task_id": task_id,
1677 "tasks.ANYINDEX.target_record.ne": None,
1678 },
1679 fail_on_empty=False,
1680 )
1681
1682 if ro_task_dependency:
1683 for task_index, task in ro_task_dependency["tasks"]:
1684 if task["task_id"] == task_id:
1685 return ro_task_dependency, task_index
1686 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1687
1688 def _process_pending_tasks(self, ro_task):
1689 ro_task_id = ro_task["_id"]
1690 now = time.time()
1691 # one day
1692 next_check_at = now + (24 * 60 * 60)
1693 db_ro_task_update = {}
1694
1695 def _update_refresh(new_status):
1696 # compute next_refresh
1697 nonlocal task
1698 nonlocal next_check_at
1699 nonlocal db_ro_task_update
1700 nonlocal ro_task
1701
1702 next_refresh = time.time()
1703
1704 if task["item"] in ("image", "flavor"):
1705 next_refresh += self.REFRESH_IMAGE
1706 elif new_status == "BUILD":
1707 next_refresh += self.REFRESH_BUILD
1708 elif new_status == "DONE":
1709 next_refresh += self.REFRESH_ACTIVE
1710 else:
1711 next_refresh += self.REFRESH_ERROR
1712
1713 next_check_at = min(next_check_at, next_refresh)
1714 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1715 ro_task["vim_info"]["refresh_at"] = next_refresh
1716
1717 try:
1718 # 0: get task_status_create
1719 lock_object = None
1720 task_status_create = None
1721 task_create = next(
1722 (
1723 t
1724 for t in ro_task["tasks"]
1725 if t
1726 and t["action"] == "CREATE"
1727 and t["status"] in ("BUILD", "DONE")
1728 ),
1729 None,
1730 )
1731
1732 if task_create:
1733 task_status_create = task_create["status"]
1734
1735 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1736 for task_action in ("DELETE", "CREATE", "EXEC"):
1737 db_vim_update = None
1738 new_status = None
1739
1740 for task_index, task in enumerate(ro_task["tasks"]):
1741 if not task:
1742 continue # task deleted
1743
1744 task_depends = {}
1745 target_update = None
1746
1747 if (
1748 (
1749 task_action in ("DELETE", "EXEC")
1750 and task["status"] not in ("SCHEDULED", "BUILD")
1751 )
1752 or task["action"] != task_action
1753 or (
1754 task_action == "CREATE"
1755 and task["status"] in ("FINISHED", "SUPERSEDED")
1756 )
1757 ):
1758 continue
1759
1760 task_path = "tasks.{}.status".format(task_index)
1761 try:
1762 db_vim_info_update = None
1763
1764 if task["status"] == "SCHEDULED":
1765 # check if tasks that this depends on have been completed
1766 dependency_not_completed = False
1767
1768 for dependency_task_id in task.get("depends_on") or ():
1769 (
1770 dependency_ro_task,
1771 dependency_task_index,
1772 ) = self._get_dependency(
1773 dependency_task_id, target_id=ro_task["target_id"]
1774 )
1775 dependency_task = dependency_ro_task["tasks"][
1776 dependency_task_index
1777 ]
1778
1779 if dependency_task["status"] == "SCHEDULED":
1780 dependency_not_completed = True
1781 next_check_at = min(
1782 next_check_at, dependency_ro_task["to_check_at"]
1783 )
1784 # must allow dependent task to be processed first
1785 # to do this set time after last_task_processed
1786 next_check_at = max(
1787 self.time_last_task_processed, next_check_at
1788 )
1789 break
1790 elif dependency_task["status"] == "FAILED":
1791 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
1792 task["action"],
1793 task["item"],
1794 dependency_task["action"],
1795 dependency_task["item"],
1796 dependency_task_id,
1797 dependency_ro_task["vim_info"].get(
1798 "vim_details"
1799 ),
1800 )
1801 self.logger.error(
1802 "task={} {}".format(task["task_id"], error_text)
1803 )
1804 raise NsWorkerException(error_text)
1805
1806 task_depends[dependency_task_id] = dependency_ro_task[
1807 "vim_info"
1808 ]["vim_id"]
1809 task_depends[
1810 "TASK-{}".format(dependency_task_id)
1811 ] = dependency_ro_task["vim_info"]["vim_id"]
1812
1813 if dependency_not_completed:
1814 # TODO set at vim_info.vim_details that it is waiting
1815 continue
1816
1817 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
1818 # the task of renew this locking. It will update database locket_at periodically
1819 if not lock_object:
1820 lock_object = LockRenew.add_lock_object(
1821 "ro_tasks", ro_task, self
1822 )
1823
1824 if task["action"] == "DELETE":
1825 (new_status, db_vim_info_update,) = self._delete_task(
1826 ro_task, task_index, task_depends, db_ro_task_update
1827 )
1828 new_status = (
1829 "FINISHED" if new_status == "DONE" else new_status
1830 )
1831 # ^with FINISHED instead of DONE it will not be refreshing
1832
1833 if new_status in ("FINISHED", "SUPERSEDED"):
1834 target_update = "DELETE"
1835 elif task["action"] == "EXEC":
1836 (
1837 new_status,
1838 db_vim_info_update,
1839 db_task_update,
1840 ) = self.item2class[task["item"]].exec(
1841 ro_task, task_index, task_depends
1842 )
1843 new_status = (
1844 "FINISHED" if new_status == "DONE" else new_status
1845 )
1846 # ^with FINISHED instead of DONE it will not be refreshing
1847
1848 if db_task_update:
1849 # load into database the modified db_task_update "retries" and "next_retry"
1850 if db_task_update.get("retries"):
1851 db_ro_task_update[
1852 "tasks.{}.retries".format(task_index)
1853 ] = db_task_update["retries"]
1854
1855 next_check_at = time.time() + db_task_update.get(
1856 "next_retry", 60
1857 )
1858 target_update = None
1859 elif task["action"] == "CREATE":
1860 if task["status"] == "SCHEDULED":
1861 if task_status_create:
1862 new_status = task_status_create
1863 target_update = "COPY_VIM_INFO"
1864 else:
1865 new_status, db_vim_info_update = self.item2class[
1866 task["item"]
1867 ].new(ro_task, task_index, task_depends)
1868 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
1869 _update_refresh(new_status)
1870 else:
1871 if (
1872 ro_task["vim_info"]["refresh_at"]
1873 and now > ro_task["vim_info"]["refresh_at"]
1874 ):
1875 new_status, db_vim_info_update = self.item2class[
1876 task["item"]
1877 ].refresh(ro_task)
1878 _update_refresh(new_status)
1879
1880 except Exception as e:
1881 new_status = "FAILED"
1882 db_vim_info_update = {
1883 "vim_status": "VIM_ERROR",
1884 "vim_details": str(e),
1885 }
1886
1887 if not isinstance(
1888 e, (NsWorkerException, vimconn.VimConnException)
1889 ):
1890 self.logger.error(
1891 "Unexpected exception at _delete_task task={}: {}".format(
1892 task["task_id"], e
1893 ),
1894 exc_info=True,
1895 )
1896
1897 try:
1898 if db_vim_info_update:
1899 db_vim_update = db_vim_info_update.copy()
1900 db_ro_task_update.update(
1901 {
1902 "vim_info." + k: v
1903 for k, v in db_vim_info_update.items()
1904 }
1905 )
1906 ro_task["vim_info"].update(db_vim_info_update)
1907
1908 if new_status:
1909 if task_action == "CREATE":
1910 task_status_create = new_status
1911 db_ro_task_update[task_path] = new_status
1912
1913 if target_update or db_vim_update:
1914 if target_update == "DELETE":
1915 self._update_target(task, None)
1916 elif target_update == "COPY_VIM_INFO":
1917 self._update_target(task, ro_task["vim_info"])
1918 else:
1919 self._update_target(task, db_vim_update)
1920
1921 except Exception as e:
1922 if (
1923 isinstance(e, DbException)
1924 and e.http_code == HTTPStatus.NOT_FOUND
1925 ):
1926 # if the vnfrs or nsrs has been removed from database, this task must be removed
1927 self.logger.debug(
1928 "marking to delete task={}".format(task["task_id"])
1929 )
1930 self.tasks_to_delete.append(task)
1931 else:
1932 self.logger.error(
1933 "Unexpected exception at _update_target task={}: {}".format(
1934 task["task_id"], e
1935 ),
1936 exc_info=True,
1937 )
1938
1939 locked_at = ro_task["locked_at"]
1940
1941 if lock_object:
1942 locked_at = [
1943 lock_object["locked_at"],
1944 lock_object["locked_at"] + self.task_locked_time,
1945 ]
1946 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
1947 # contain exactly locked_at + self.task_locked_time
1948 LockRenew.remove_lock_object(lock_object)
1949
1950 q_filter = {
1951 "_id": ro_task["_id"],
1952 "to_check_at": ro_task["to_check_at"],
1953 "locked_at": locked_at,
1954 }
1955 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
1956 # outside this task (by ro_nbi) do not update it
1957 db_ro_task_update["locked_by"] = None
1958 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
1959 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
1960 db_ro_task_update["modified_at"] = now
1961 db_ro_task_update["to_check_at"] = next_check_at
1962
1963 if not self.db.set_one(
1964 "ro_tasks",
1965 update_dict=db_ro_task_update,
1966 q_filter=q_filter,
1967 fail_on_empty=False,
1968 ):
1969 del db_ro_task_update["to_check_at"]
1970 del q_filter["to_check_at"]
1971 self.db.set_one(
1972 "ro_tasks",
1973 q_filter=q_filter,
1974 update_dict=db_ro_task_update,
1975 fail_on_empty=True,
1976 )
1977 except DbException as e:
1978 self.logger.error(
1979 "ro_task={} Error updating database {}".format(ro_task_id, e)
1980 )
1981 except Exception as e:
1982 self.logger.error(
1983 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
1984 )
1985
1986 def _update_target(self, task, ro_vim_item_update):
1987 table, _, temp = task["target_record"].partition(":")
1988 _id, _, path_vim_status = temp.partition(":")
1989 path_item = path_vim_status[: path_vim_status.rfind(".")]
1990 path_item = path_item[: path_item.rfind(".")]
1991 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
1992 # path_item: dot separated list targeting record information, e.g. "vdur.10"
1993
1994 if ro_vim_item_update:
1995 update_dict = {
1996 path_vim_status + "." + k: v
1997 for k, v in ro_vim_item_update.items()
1998 if k
1999 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2000 }
2001
2002 if path_vim_status.startswith("vdur."):
2003 # for backward compatibility, add vdur.name apart from vdur.vim_name
2004 if ro_vim_item_update.get("vim_name"):
2005 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2006
2007 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2008 if ro_vim_item_update.get("vim_id"):
2009 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2010
2011 # update general status
2012 if ro_vim_item_update.get("vim_status"):
2013 update_dict[path_item + ".status"] = ro_vim_item_update[
2014 "vim_status"
2015 ]
2016
2017 if ro_vim_item_update.get("interfaces"):
2018 path_interfaces = path_item + ".interfaces"
2019
2020 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2021 if iface:
2022 update_dict.update(
2023 {
2024 path_interfaces + ".{}.".format(i) + k: v
2025 for k, v in iface.items()
2026 if k in ("vlan", "compute_node", "pci")
2027 }
2028 )
2029
2030 # put ip_address and mac_address with ip-address and mac-address
2031 if iface.get("ip_address"):
2032 update_dict[
2033 path_interfaces + ".{}.".format(i) + "ip-address"
2034 ] = iface["ip_address"]
2035
2036 if iface.get("mac_address"):
2037 update_dict[
2038 path_interfaces + ".{}.".format(i) + "mac-address"
2039 ] = iface["mac_address"]
2040
2041 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2042 update_dict["ip-address"] = iface.get("ip_address").split(
2043 ";"
2044 )[0]
2045
2046 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2047 update_dict[path_item + ".ip-address"] = iface.get(
2048 "ip_address"
2049 ).split(";")[0]
2050
2051 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2052 else:
2053 update_dict = {path_item + ".status": "DELETED"}
2054 self.db.set_one(
2055 table,
2056 q_filter={"_id": _id},
2057 update_dict=update_dict,
2058 unset={path_vim_status: None},
2059 )
2060
2061 def _process_delete_db_tasks(self):
2062 """
2063 Delete task from database because vnfrs or nsrs or both have been deleted
2064 :return: None. Uses and modify self.tasks_to_delete
2065 """
2066 while self.tasks_to_delete:
2067 task = self.tasks_to_delete[0]
2068 vnfrs_deleted = None
2069 nsr_id = task["nsr_id"]
2070
2071 if task["target_record"].startswith("vnfrs:"):
2072 # check if nsrs is present
2073 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2074 vnfrs_deleted = task["target_record"].split(":")[1]
2075
2076 try:
2077 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2078 except Exception as e:
2079 self.logger.error(
2080 "Error deleting task={}: {}".format(task["task_id"], e)
2081 )
2082 self.tasks_to_delete.pop(0)
2083
2084 @staticmethod
2085 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2086 """
2087 Static method because it is called from osm_ng_ro.ns
2088 :param db: instance of database to use
2089 :param nsr_id: affected nsrs id
2090 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2091 :return: None, exception is fails
2092 """
2093 retries = 5
2094 for retry in range(retries):
2095 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2096 now = time.time()
2097 conflict = False
2098
2099 for ro_task in ro_tasks:
2100 db_update = {}
2101 to_delete_ro_task = True
2102
2103 for index, task in enumerate(ro_task["tasks"]):
2104 if not task:
2105 pass
2106 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2107 vnfrs_deleted
2108 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2109 ):
2110 db_update["tasks.{}".format(index)] = None
2111 else:
2112 # used by other nsr, ro_task cannot be deleted
2113 to_delete_ro_task = False
2114
2115 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2116 if to_delete_ro_task:
2117 if not db.del_one(
2118 "ro_tasks",
2119 q_filter={
2120 "_id": ro_task["_id"],
2121 "modified_at": ro_task["modified_at"],
2122 },
2123 fail_on_empty=False,
2124 ):
2125 conflict = True
2126 elif db_update:
2127 db_update["modified_at"] = now
2128 if not db.set_one(
2129 "ro_tasks",
2130 q_filter={
2131 "_id": ro_task["_id"],
2132 "modified_at": ro_task["modified_at"],
2133 },
2134 update_dict=db_update,
2135 fail_on_empty=False,
2136 ):
2137 conflict = True
2138 if not conflict:
2139 return
2140 else:
2141 raise NsWorkerException("Exceeded {} retries".format(retries))
2142
2143 def run(self):
2144 # load database
2145 self.logger.info("Starting")
2146 while True:
2147 # step 1: get commands from queue
2148 try:
2149 if self.vim_targets:
2150 task = self.task_queue.get(block=False)
2151 else:
2152 if not self.idle:
2153 self.logger.debug("enters in idle state")
2154 self.idle = True
2155 task = self.task_queue.get(block=True)
2156 self.idle = False
2157
2158 if task[0] == "terminate":
2159 break
2160 elif task[0] == "load_vim":
2161 self.logger.info("order to load vim {}".format(task[1]))
2162 self._load_vim(task[1])
2163 elif task[0] == "unload_vim":
2164 self.logger.info("order to unload vim {}".format(task[1]))
2165 self._unload_vim(task[1])
2166 elif task[0] == "reload_vim":
2167 self._reload_vim(task[1])
2168 elif task[0] == "check_vim":
2169 self.logger.info("order to check vim {}".format(task[1]))
2170 self._check_vim(task[1])
2171 continue
2172 except Exception as e:
2173 if isinstance(e, queue.Empty):
2174 pass
2175 else:
2176 self.logger.critical(
2177 "Error processing task: {}".format(e), exc_info=True
2178 )
2179
2180 # step 2: process pending_tasks, delete not needed tasks
2181 try:
2182 if self.tasks_to_delete:
2183 self._process_delete_db_tasks()
2184 busy = False
2185 ro_task = self._get_db_task()
2186 if ro_task:
2187 self._process_pending_tasks(ro_task)
2188 busy = True
2189 if not busy:
2190 time.sleep(5)
2191 except Exception as e:
2192 self.logger.critical(
2193 "Unexpected exception at run: " + str(e), exc_info=True
2194 )
2195
2196 self.logger.info("Finishing")