Updating tox.ini
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 }
215 self.logger.debug(
216 "task={} {} new-net={} created={}".format(
217 task_id, ro_task["target_id"], vim_net_id, created
218 )
219 )
220
221 return "BUILD", ro_vim_item_update
222 except (vimconn.VimConnException, NsWorkerException) as e:
223 self.logger.error(
224 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
225 )
226 ro_vim_item_update = {
227 "vim_status": "VIM_ERROR",
228 "created": created,
229 "vim_details": str(e),
230 }
231
232 return "FAILED", ro_vim_item_update
233
234 def refresh(self, ro_task):
235 """Call VIM to get network status"""
236 ro_task_id = ro_task["_id"]
237 target_vim = self.my_vims[ro_task["target_id"]]
238 vim_id = ro_task["vim_info"]["vim_id"]
239 net_to_refresh_list = [vim_id]
240
241 try:
242 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
243 vim_info = vim_dict[vim_id]
244
245 if vim_info["status"] == "ACTIVE":
246 task_status = "DONE"
247 elif vim_info["status"] == "BUILD":
248 task_status = "BUILD"
249 else:
250 task_status = "FAILED"
251 except vimconn.VimConnException as e:
252 # Mark all tasks at VIM_ERROR status
253 self.logger.error(
254 "ro_task={} vim={} get-net={}: {}".format(
255 ro_task_id, ro_task["target_id"], vim_id, e
256 )
257 )
258 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
259 task_status = "FAILED"
260
261 ro_vim_item_update = {}
262 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
263 ro_vim_item_update["vim_status"] = vim_info["status"]
264
265 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
266 ro_vim_item_update["vim_name"] = vim_info.get("name")
267
268 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
269 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
270 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
271 elif vim_info["status"] == "DELETED":
272 ro_vim_item_update["vim_id"] = None
273 ro_vim_item_update["vim_details"] = "Deleted externally"
274 else:
275 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
276 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
277
278 if ro_vim_item_update:
279 self.logger.debug(
280 "ro_task={} {} get-net={}: status={} {}".format(
281 ro_task_id,
282 ro_task["target_id"],
283 vim_id,
284 ro_vim_item_update.get("vim_status"),
285 ro_vim_item_update.get("vim_details")
286 if ro_vim_item_update.get("vim_status") != "ACTIVE"
287 else "",
288 )
289 )
290
291 return task_status, ro_vim_item_update
292
293 def delete(self, ro_task, task_index):
294 task = ro_task["tasks"][task_index]
295 task_id = task["task_id"]
296 net_vim_id = ro_task["vim_info"]["vim_id"]
297 ro_vim_item_update_ok = {
298 "vim_status": "DELETED",
299 "created": False,
300 "vim_details": "DELETED",
301 "vim_id": None,
302 }
303
304 try:
305 if net_vim_id or ro_task["vim_info"]["created_items"]:
306 target_vim = self.my_vims[ro_task["target_id"]]
307 target_vim.delete_network(
308 net_vim_id, ro_task["vim_info"]["created_items"]
309 )
310 except vimconn.VimConnNotFoundException:
311 ro_vim_item_update_ok["vim_details"] = "already deleted"
312 except vimconn.VimConnException as e:
313 self.logger.error(
314 "ro_task={} vim={} del-net={}: {}".format(
315 ro_task["_id"], ro_task["target_id"], net_vim_id, e
316 )
317 )
318 ro_vim_item_update = {
319 "vim_status": "VIM_ERROR",
320 "vim_details": "Error while deleting: {}".format(e),
321 }
322
323 return "FAILED", ro_vim_item_update
324
325 self.logger.debug(
326 "task={} {} del-net={} {}".format(
327 task_id,
328 ro_task["target_id"],
329 net_vim_id,
330 ro_vim_item_update_ok.get("vim_details", ""),
331 )
332 )
333
334 return "DONE", ro_vim_item_update_ok
335
336
337 class VimInteractionVdu(VimInteractionBase):
338 max_retries_inject_ssh_key = 20 # 20 times
339 time_retries_inject_ssh_key = 30 # wevery 30 seconds
340
341 def new(self, ro_task, task_index, task_depends):
342 task = ro_task["tasks"][task_index]
343 task_id = task["task_id"]
344 created = False
345 created_items = {}
346 target_vim = self.my_vims[ro_task["target_id"]]
347
348 try:
349 created = True
350 params = task["params"]
351 params_copy = deepcopy(params)
352 net_list = params_copy["net_list"]
353
354 for net in net_list:
355 # change task_id into network_id
356 if "net_id" in net and net["net_id"].startswith("TASK-"):
357 network_id = task_depends[net["net_id"]]
358
359 if not network_id:
360 raise NsWorkerException(
361 "Cannot create VM because depends on a network not created or found "
362 "for {}".format(net["net_id"])
363 )
364
365 net["net_id"] = network_id
366
367 if params_copy["image_id"].startswith("TASK-"):
368 params_copy["image_id"] = task_depends[params_copy["image_id"]]
369
370 if params_copy["flavor_id"].startswith("TASK-"):
371 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
372
373 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
374 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
375
376 ro_vim_item_update = {
377 "vim_id": vim_vm_id,
378 "vim_status": "BUILD",
379 "created": created,
380 "created_items": created_items,
381 "vim_details": None,
382 "interfaces_vim_ids": interfaces,
383 "interfaces": [],
384 }
385 self.logger.debug(
386 "task={} {} new-vm={} created={}".format(
387 task_id, ro_task["target_id"], vim_vm_id, created
388 )
389 )
390
391 return "BUILD", ro_vim_item_update
392 except (vimconn.VimConnException, NsWorkerException) as e:
393 self.logger.error(
394 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
395 )
396 ro_vim_item_update = {
397 "vim_status": "VIM_ERROR",
398 "created": created,
399 "vim_details": str(e),
400 }
401
402 return "FAILED", ro_vim_item_update
403
404 def delete(self, ro_task, task_index):
405 task = ro_task["tasks"][task_index]
406 task_id = task["task_id"]
407 vm_vim_id = ro_task["vim_info"]["vim_id"]
408 ro_vim_item_update_ok = {
409 "vim_status": "DELETED",
410 "created": False,
411 "vim_details": "DELETED",
412 "vim_id": None,
413 }
414
415 try:
416 if vm_vim_id or ro_task["vim_info"]["created_items"]:
417 target_vim = self.my_vims[ro_task["target_id"]]
418 target_vim.delete_vminstance(
419 vm_vim_id, ro_task["vim_info"]["created_items"]
420 )
421 except vimconn.VimConnNotFoundException:
422 ro_vim_item_update_ok["vim_details"] = "already deleted"
423 except vimconn.VimConnException as e:
424 self.logger.error(
425 "ro_task={} vim={} del-vm={}: {}".format(
426 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
427 )
428 )
429 ro_vim_item_update = {
430 "vim_status": "VIM_ERROR",
431 "vim_details": "Error while deleting: {}".format(e),
432 }
433
434 return "FAILED", ro_vim_item_update
435
436 self.logger.debug(
437 "task={} {} del-vm={} {}".format(
438 task_id,
439 ro_task["target_id"],
440 vm_vim_id,
441 ro_vim_item_update_ok.get("vim_details", ""),
442 )
443 )
444
445 return "DONE", ro_vim_item_update_ok
446
447 def refresh(self, ro_task):
448 """Call VIM to get vm status"""
449 ro_task_id = ro_task["_id"]
450 target_vim = self.my_vims[ro_task["target_id"]]
451 vim_id = ro_task["vim_info"]["vim_id"]
452
453 if not vim_id:
454 return None, None
455
456 vm_to_refresh_list = [vim_id]
457 try:
458 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
459 vim_info = vim_dict[vim_id]
460
461 if vim_info["status"] == "ACTIVE":
462 task_status = "DONE"
463 elif vim_info["status"] == "BUILD":
464 task_status = "BUILD"
465 else:
466 task_status = "FAILED"
467
468 # try to load and parse vim_information
469 try:
470 vim_info_info = yaml.safe_load(vim_info["vim_info"])
471 if vim_info_info.get("name"):
472 vim_info["name"] = vim_info_info["name"]
473 except Exception:
474 pass
475 except vimconn.VimConnException as e:
476 # Mark all tasks at VIM_ERROR status
477 self.logger.error(
478 "ro_task={} vim={} get-vm={}: {}".format(
479 ro_task_id, ro_task["target_id"], vim_id, e
480 )
481 )
482 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
483 task_status = "FAILED"
484
485 ro_vim_item_update = {}
486
487 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
488 vim_interfaces = []
489 if vim_info.get("interfaces"):
490 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
491 iface = next(
492 (
493 iface
494 for iface in vim_info["interfaces"]
495 if vim_iface_id == iface["vim_interface_id"]
496 ),
497 None,
498 )
499 # if iface:
500 # iface.pop("vim_info", None)
501 vim_interfaces.append(iface)
502
503 task_create = next(
504 t
505 for t in ro_task["tasks"]
506 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
507 )
508 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
509 vim_interfaces[task_create["mgmt_vnf_interface"]][
510 "mgmt_vnf_interface"
511 ] = True
512
513 mgmt_vdu_iface = task_create.get(
514 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
515 )
516 if vim_interfaces:
517 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
518
519 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
520 ro_vim_item_update["interfaces"] = vim_interfaces
521
522 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
523 ro_vim_item_update["vim_status"] = vim_info["status"]
524
525 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
526 ro_vim_item_update["vim_name"] = vim_info.get("name")
527
528 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
529 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
530 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
531 elif vim_info["status"] == "DELETED":
532 ro_vim_item_update["vim_id"] = None
533 ro_vim_item_update["vim_details"] = "Deleted externally"
534 else:
535 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
536 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
537
538 if ro_vim_item_update:
539 self.logger.debug(
540 "ro_task={} {} get-vm={}: status={} {}".format(
541 ro_task_id,
542 ro_task["target_id"],
543 vim_id,
544 ro_vim_item_update.get("vim_status"),
545 ro_vim_item_update.get("vim_details")
546 if ro_vim_item_update.get("vim_status") != "ACTIVE"
547 else "",
548 )
549 )
550
551 return task_status, ro_vim_item_update
552
553 def exec(self, ro_task, task_index, task_depends):
554 task = ro_task["tasks"][task_index]
555 task_id = task["task_id"]
556 target_vim = self.my_vims[ro_task["target_id"]]
557 db_task_update = {"retries": 0}
558 retries = task.get("retries", 0)
559
560 try:
561 params = task["params"]
562 params_copy = deepcopy(params)
563 params_copy["ro_key"] = self.db.decrypt(
564 params_copy.pop("private_key"),
565 params_copy.pop("schema_version"),
566 params_copy.pop("salt"),
567 )
568 params_copy["ip_addr"] = params_copy.pop("ip_address")
569 target_vim.inject_user_key(**params_copy)
570 self.logger.debug(
571 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
572 )
573
574 return (
575 "DONE",
576 None,
577 db_task_update,
578 ) # params_copy["key"]
579 except (vimconn.VimConnException, NsWorkerException) as e:
580 retries += 1
581
582 if retries < self.max_retries_inject_ssh_key:
583 return (
584 "BUILD",
585 None,
586 {
587 "retries": retries,
588 "next_retry": self.time_retries_inject_ssh_key,
589 },
590 )
591
592 self.logger.error(
593 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
594 )
595 ro_vim_item_update = {"vim_details": str(e)}
596
597 return "FAILED", ro_vim_item_update, db_task_update
598
599
600 class VimInteractionImage(VimInteractionBase):
601 def new(self, ro_task, task_index, task_depends):
602 task = ro_task["tasks"][task_index]
603 task_id = task["task_id"]
604 created = False
605 created_items = {}
606 target_vim = self.my_vims[ro_task["target_id"]]
607
608 try:
609 # FIND
610 if task.get("find_params"):
611 vim_images = target_vim.get_image_list(**task["find_params"])
612
613 if not vim_images:
614 raise NsWorkerExceptionNotFound(
615 "Image not found with this criteria: '{}'".format(
616 task["find_params"]
617 )
618 )
619 elif len(vim_images) > 1:
620 raise NsWorkerException(
621 "More than one image found with this criteria: '{}'".format(
622 task["find_params"]
623 )
624 )
625 else:
626 vim_image_id = vim_images[0]["id"]
627
628 ro_vim_item_update = {
629 "vim_id": vim_image_id,
630 "vim_status": "DONE",
631 "created": created,
632 "created_items": created_items,
633 "vim_details": None,
634 }
635 self.logger.debug(
636 "task={} {} new-image={} created={}".format(
637 task_id, ro_task["target_id"], vim_image_id, created
638 )
639 )
640
641 return "DONE", ro_vim_item_update
642 except (NsWorkerException, vimconn.VimConnException) as e:
643 self.logger.error(
644 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
645 )
646 ro_vim_item_update = {
647 "vim_status": "VIM_ERROR",
648 "created": created,
649 "vim_details": str(e),
650 }
651
652 return "FAILED", ro_vim_item_update
653
654
655 class VimInteractionFlavor(VimInteractionBase):
656 def delete(self, ro_task, task_index):
657 task = ro_task["tasks"][task_index]
658 task_id = task["task_id"]
659 flavor_vim_id = ro_task["vim_info"]["vim_id"]
660 ro_vim_item_update_ok = {
661 "vim_status": "DELETED",
662 "created": False,
663 "vim_details": "DELETED",
664 "vim_id": None,
665 }
666
667 try:
668 if flavor_vim_id:
669 target_vim = self.my_vims[ro_task["target_id"]]
670 target_vim.delete_flavor(flavor_vim_id)
671 except vimconn.VimConnNotFoundException:
672 ro_vim_item_update_ok["vim_details"] = "already deleted"
673 except vimconn.VimConnException as e:
674 self.logger.error(
675 "ro_task={} vim={} del-flavor={}: {}".format(
676 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
677 )
678 )
679 ro_vim_item_update = {
680 "vim_status": "VIM_ERROR",
681 "vim_details": "Error while deleting: {}".format(e),
682 }
683
684 return "FAILED", ro_vim_item_update
685
686 self.logger.debug(
687 "task={} {} del-flavor={} {}".format(
688 task_id,
689 ro_task["target_id"],
690 flavor_vim_id,
691 ro_vim_item_update_ok.get("vim_details", ""),
692 )
693 )
694
695 return "DONE", ro_vim_item_update_ok
696
697 def new(self, ro_task, task_index, task_depends):
698 task = ro_task["tasks"][task_index]
699 task_id = task["task_id"]
700 created = False
701 created_items = {}
702 target_vim = self.my_vims[ro_task["target_id"]]
703
704 try:
705 # FIND
706 vim_flavor_id = None
707
708 if task.get("find_params"):
709 try:
710 flavor_data = task["find_params"]["flavor_data"]
711 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
712 except vimconn.VimConnNotFoundException:
713 pass
714
715 if not vim_flavor_id and task.get("params"):
716 # CREATE
717 flavor_data = task["params"]["flavor_data"]
718 vim_flavor_id = target_vim.new_flavor(flavor_data)
719 created = True
720
721 ro_vim_item_update = {
722 "vim_id": vim_flavor_id,
723 "vim_status": "DONE",
724 "created": created,
725 "created_items": created_items,
726 "vim_details": None,
727 }
728 self.logger.debug(
729 "task={} {} new-flavor={} created={}".format(
730 task_id, ro_task["target_id"], vim_flavor_id, created
731 )
732 )
733
734 return "DONE", ro_vim_item_update
735 except (vimconn.VimConnException, NsWorkerException) as e:
736 self.logger.error(
737 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
738 )
739 ro_vim_item_update = {
740 "vim_status": "VIM_ERROR",
741 "created": created,
742 "vim_details": str(e),
743 }
744
745 return "FAILED", ro_vim_item_update
746
747
748 class VimInteractionSdnNet(VimInteractionBase):
749 @staticmethod
750 def _match_pci(port_pci, mapping):
751 """
752 Check if port_pci matches with mapping
753 mapping can have brackets to indicate that several chars are accepted. e.g
754 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
755 :param port_pci: text
756 :param mapping: text, can contain brackets to indicate several chars are available
757 :return: True if matches, False otherwise
758 """
759 if not port_pci or not mapping:
760 return False
761 if port_pci == mapping:
762 return True
763
764 mapping_index = 0
765 pci_index = 0
766 while True:
767 bracket_start = mapping.find("[", mapping_index)
768
769 if bracket_start == -1:
770 break
771
772 bracket_end = mapping.find("]", bracket_start)
773 if bracket_end == -1:
774 break
775
776 length = bracket_start - mapping_index
777 if (
778 length
779 and port_pci[pci_index : pci_index + length]
780 != mapping[mapping_index:bracket_start]
781 ):
782 return False
783
784 if (
785 port_pci[pci_index + length]
786 not in mapping[bracket_start + 1 : bracket_end]
787 ):
788 return False
789
790 pci_index += length + 1
791 mapping_index = bracket_end + 1
792
793 if port_pci[pci_index:] != mapping[mapping_index:]:
794 return False
795
796 return True
797
798 def _get_interfaces(self, vlds_to_connect, vim_account_id):
799 """
800 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
801 :param vim_account_id:
802 :return:
803 """
804 interfaces = []
805
806 for vld in vlds_to_connect:
807 table, _, db_id = vld.partition(":")
808 db_id, _, vld = db_id.partition(":")
809 _, _, vld_id = vld.partition(".")
810
811 if table == "vnfrs":
812 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
813 iface_key = "vnf-vld-id"
814 else: # table == "nsrs"
815 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
816 iface_key = "ns-vld-id"
817
818 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
819
820 for db_vnfr in db_vnfrs:
821 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
822 for iface_index, interface in enumerate(vdur["interfaces"]):
823 if interface.get(iface_key) == vld_id and interface.get(
824 "type"
825 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
826 # only SR-IOV o PT
827 interface_ = interface.copy()
828 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
829 db_vnfr["_id"], vdu_index, iface_index
830 )
831
832 if vdur.get("status") == "ERROR":
833 interface_["status"] = "ERROR"
834
835 interfaces.append(interface_)
836
837 return interfaces
838
839 def refresh(self, ro_task):
840 # look for task create
841 task_create_index, _ = next(
842 i_t
843 for i_t in enumerate(ro_task["tasks"])
844 if i_t[1]
845 and i_t[1]["action"] == "CREATE"
846 and i_t[1]["status"] != "FINISHED"
847 )
848
849 return self.new(ro_task, task_create_index, None)
850
851 def new(self, ro_task, task_index, task_depends):
852
853 task = ro_task["tasks"][task_index]
854 task_id = task["task_id"]
855 target_vim = self.my_vims[ro_task["target_id"]]
856
857 sdn_net_id = ro_task["vim_info"]["vim_id"]
858
859 created_items = ro_task["vim_info"].get("created_items")
860 connected_ports = ro_task["vim_info"].get("connected_ports", [])
861 new_connected_ports = []
862 last_update = ro_task["vim_info"].get("last_update", 0)
863 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
864 error_list = []
865 created = ro_task["vim_info"].get("created", False)
866
867 try:
868 # CREATE
869 params = task["params"]
870 vlds_to_connect = params["vlds"]
871 associated_vim = params["target_vim"]
872 # external additional ports
873 additional_ports = params.get("sdn-ports") or ()
874 _, _, vim_account_id = associated_vim.partition(":")
875
876 if associated_vim:
877 # get associated VIM
878 if associated_vim not in self.db_vims:
879 self.db_vims[associated_vim] = self.db.get_one(
880 "vim_accounts", {"_id": vim_account_id}
881 )
882
883 db_vim = self.db_vims[associated_vim]
884
885 # look for ports to connect
886 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
887 # print(ports)
888
889 sdn_ports = []
890 pending_ports = error_ports = 0
891 vlan_used = None
892 sdn_need_update = False
893
894 for port in ports:
895 vlan_used = port.get("vlan") or vlan_used
896
897 # TODO. Do not connect if already done
898 if not port.get("compute_node") or not port.get("pci"):
899 if port.get("status") == "ERROR":
900 error_ports += 1
901 else:
902 pending_ports += 1
903 continue
904
905 pmap = None
906 compute_node_mappings = next(
907 (
908 c
909 for c in db_vim["config"].get("sdn-port-mapping", ())
910 if c and c["compute_node"] == port["compute_node"]
911 ),
912 None,
913 )
914
915 if compute_node_mappings:
916 # process port_mapping pci of type 0000:af:1[01].[1357]
917 pmap = next(
918 (
919 p
920 for p in compute_node_mappings["ports"]
921 if self._match_pci(port["pci"], p.get("pci"))
922 ),
923 None,
924 )
925
926 if not pmap:
927 if not db_vim["config"].get("mapping_not_needed"):
928 error_list.append(
929 "Port mapping not found for compute_node={} pci={}".format(
930 port["compute_node"], port["pci"]
931 )
932 )
933 continue
934
935 pmap = {}
936
937 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
938 new_port = {
939 "service_endpoint_id": pmap.get("service_endpoint_id")
940 or service_endpoint_id,
941 "service_endpoint_encapsulation_type": "dot1q"
942 if port["type"] == "SR-IOV"
943 else None,
944 "service_endpoint_encapsulation_info": {
945 "vlan": port.get("vlan"),
946 "mac": port.get("mac-address"),
947 "device_id": pmap.get("device_id") or port["compute_node"],
948 "device_interface_id": pmap.get("device_interface_id")
949 or port["pci"],
950 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
951 "switch_port": pmap.get("switch_port"),
952 "service_mapping_info": pmap.get("service_mapping_info"),
953 },
954 }
955
956 # TODO
957 # if port["modified_at"] > last_update:
958 # sdn_need_update = True
959 new_connected_ports.append(port["id"]) # TODO
960 sdn_ports.append(new_port)
961
962 if error_ports:
963 error_list.append(
964 "{} interfaces have not been created as VDU is on ERROR status".format(
965 error_ports
966 )
967 )
968
969 # connect external ports
970 for index, additional_port in enumerate(additional_ports):
971 additional_port_id = additional_port.get(
972 "service_endpoint_id"
973 ) or "external-{}".format(index)
974 sdn_ports.append(
975 {
976 "service_endpoint_id": additional_port_id,
977 "service_endpoint_encapsulation_type": additional_port.get(
978 "service_endpoint_encapsulation_type", "dot1q"
979 ),
980 "service_endpoint_encapsulation_info": {
981 "vlan": additional_port.get("vlan") or vlan_used,
982 "mac": additional_port.get("mac_address"),
983 "device_id": additional_port.get("device_id"),
984 "device_interface_id": additional_port.get(
985 "device_interface_id"
986 ),
987 "switch_dpid": additional_port.get("switch_dpid")
988 or additional_port.get("switch_id"),
989 "switch_port": additional_port.get("switch_port"),
990 "service_mapping_info": additional_port.get(
991 "service_mapping_info"
992 ),
993 },
994 }
995 )
996 new_connected_ports.append(additional_port_id)
997 sdn_info = ""
998
999 # if there are more ports to connect or they have been modified, call create/update
1000 if error_list:
1001 sdn_status = "ERROR"
1002 sdn_info = "; ".join(error_list)
1003 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1004 last_update = time.time()
1005
1006 if not sdn_net_id:
1007 if len(sdn_ports) < 2:
1008 sdn_status = "ACTIVE"
1009
1010 if not pending_ports:
1011 self.logger.debug(
1012 "task={} {} new-sdn-net done, less than 2 ports".format(
1013 task_id, ro_task["target_id"]
1014 )
1015 )
1016 else:
1017 net_type = params.get("type") or "ELAN"
1018 (
1019 sdn_net_id,
1020 created_items,
1021 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1022 created = True
1023 self.logger.debug(
1024 "task={} {} new-sdn-net={} created={}".format(
1025 task_id, ro_task["target_id"], sdn_net_id, created
1026 )
1027 )
1028 else:
1029 created_items = target_vim.edit_connectivity_service(
1030 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1031 )
1032 created = True
1033 self.logger.debug(
1034 "task={} {} update-sdn-net={} created={}".format(
1035 task_id, ro_task["target_id"], sdn_net_id, created
1036 )
1037 )
1038
1039 connected_ports = new_connected_ports
1040 elif sdn_net_id:
1041 wim_status_dict = target_vim.get_connectivity_service_status(
1042 sdn_net_id, conn_info=created_items
1043 )
1044 sdn_status = wim_status_dict["sdn_status"]
1045
1046 if wim_status_dict.get("sdn_info"):
1047 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1048
1049 if wim_status_dict.get("error_msg"):
1050 sdn_info = wim_status_dict.get("error_msg") or ""
1051
1052 if pending_ports:
1053 if sdn_status != "ERROR":
1054 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1055 len(ports) - pending_ports, len(ports)
1056 )
1057
1058 if sdn_status == "ACTIVE":
1059 sdn_status = "BUILD"
1060
1061 ro_vim_item_update = {
1062 "vim_id": sdn_net_id,
1063 "vim_status": sdn_status,
1064 "created": created,
1065 "created_items": created_items,
1066 "connected_ports": connected_ports,
1067 "vim_details": sdn_info,
1068 "last_update": last_update,
1069 }
1070
1071 return sdn_status, ro_vim_item_update
1072 except Exception as e:
1073 self.logger.error(
1074 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1075 exc_info=not isinstance(
1076 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1077 ),
1078 )
1079 ro_vim_item_update = {
1080 "vim_status": "VIM_ERROR",
1081 "created": created,
1082 "vim_details": str(e),
1083 }
1084
1085 return "FAILED", ro_vim_item_update
1086
1087 def delete(self, ro_task, task_index):
1088 task = ro_task["tasks"][task_index]
1089 task_id = task["task_id"]
1090 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1091 ro_vim_item_update_ok = {
1092 "vim_status": "DELETED",
1093 "created": False,
1094 "vim_details": "DELETED",
1095 "vim_id": None,
1096 }
1097
1098 try:
1099 if sdn_vim_id:
1100 target_vim = self.my_vims[ro_task["target_id"]]
1101 target_vim.delete_connectivity_service(
1102 sdn_vim_id, ro_task["vim_info"].get("created_items")
1103 )
1104
1105 except Exception as e:
1106 if (
1107 isinstance(e, sdnconn.SdnConnectorError)
1108 and e.http_code == HTTPStatus.NOT_FOUND.value
1109 ):
1110 ro_vim_item_update_ok["vim_details"] = "already deleted"
1111 else:
1112 self.logger.error(
1113 "ro_task={} vim={} del-sdn-net={}: {}".format(
1114 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1115 ),
1116 exc_info=not isinstance(
1117 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1118 ),
1119 )
1120 ro_vim_item_update = {
1121 "vim_status": "VIM_ERROR",
1122 "vim_details": "Error while deleting: {}".format(e),
1123 }
1124
1125 return "FAILED", ro_vim_item_update
1126
1127 self.logger.debug(
1128 "task={} {} del-sdn-net={} {}".format(
1129 task_id,
1130 ro_task["target_id"],
1131 sdn_vim_id,
1132 ro_vim_item_update_ok.get("vim_details", ""),
1133 )
1134 )
1135
1136 return "DONE", ro_vim_item_update_ok
1137
1138
1139 class NsWorker(threading.Thread):
1140 REFRESH_BUILD = 5 # 5 seconds
1141 REFRESH_ACTIVE = 60 # 1 minute
1142 REFRESH_ERROR = 600
1143 REFRESH_IMAGE = 3600 * 10
1144 REFRESH_DELETE = 3600 * 10
1145 QUEUE_SIZE = 100
1146 terminate = False
1147
1148 def __init__(self, worker_index, config, plugins, db):
1149 """
1150
1151 :param worker_index: thread index
1152 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1153 :param plugins: global shared dict with the loaded plugins
1154 :param db: database class instance to use
1155 """
1156 threading.Thread.__init__(self)
1157 self.config = config
1158 self.plugins = plugins
1159 self.plugin_name = "unknown"
1160 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1161 self.worker_index = worker_index
1162 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1163 # targetvim: vimplugin class
1164 self.my_vims = {}
1165 # targetvim: vim information from database
1166 self.db_vims = {}
1167 # targetvim list
1168 self.vim_targets = []
1169 self.my_id = config["process_id"] + ":" + str(worker_index)
1170 self.db = db
1171 self.item2class = {
1172 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1173 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1174 "image": VimInteractionImage(
1175 self.db, self.my_vims, self.db_vims, self.logger
1176 ),
1177 "flavor": VimInteractionFlavor(
1178 self.db, self.my_vims, self.db_vims, self.logger
1179 ),
1180 "sdn_net": VimInteractionSdnNet(
1181 self.db, self.my_vims, self.db_vims, self.logger
1182 ),
1183 }
1184 self.time_last_task_processed = None
1185 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1186 self.tasks_to_delete = []
1187 # it is idle when there are not vim_targets associated
1188 self.idle = True
1189 self.task_locked_time = config["global"]["task_locked_time"]
1190
1191 def insert_task(self, task):
1192 try:
1193 self.task_queue.put(task, False)
1194 return None
1195 except queue.Full:
1196 raise NsWorkerException("timeout inserting a task")
1197
1198 def terminate(self):
1199 self.insert_task("exit")
1200
1201 def del_task(self, task):
1202 with self.task_lock:
1203 if task["status"] == "SCHEDULED":
1204 task["status"] = "SUPERSEDED"
1205 return True
1206 else: # task["status"] == "processing"
1207 self.task_lock.release()
1208 return False
1209
1210 def _process_vim_config(self, target_id, db_vim):
1211 """
1212 Process vim config, creating vim configuration files as ca_cert
1213 :param target_id: vim/sdn/wim + id
1214 :param db_vim: Vim dictionary obtained from database
1215 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1216 """
1217 if not db_vim.get("config"):
1218 return
1219
1220 file_name = ""
1221
1222 try:
1223 if db_vim["config"].get("ca_cert_content"):
1224 file_name = "{}:{}".format(target_id, self.worker_index)
1225
1226 try:
1227 mkdir(file_name)
1228 except FileExistsError:
1229 pass
1230
1231 file_name = file_name + "/ca_cert"
1232
1233 with open(file_name, "w") as f:
1234 f.write(db_vim["config"]["ca_cert_content"])
1235 del db_vim["config"]["ca_cert_content"]
1236 db_vim["config"]["ca_cert"] = file_name
1237 except Exception as e:
1238 raise NsWorkerException(
1239 "Error writing to file '{}': {}".format(file_name, e)
1240 )
1241
1242 def _load_plugin(self, name, type="vim"):
1243 # type can be vim or sdn
1244 if "rovim_dummy" not in self.plugins:
1245 self.plugins["rovim_dummy"] = VimDummyConnector
1246
1247 if "rosdn_dummy" not in self.plugins:
1248 self.plugins["rosdn_dummy"] = SdnDummyConnector
1249
1250 if name in self.plugins:
1251 return self.plugins[name]
1252
1253 try:
1254 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1255 self.plugins[name] = ep.load()
1256 except Exception as e:
1257 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1258
1259 if name and name not in self.plugins:
1260 raise NsWorkerException(
1261 "Plugin 'osm_{n}' has not been installed".format(n=name)
1262 )
1263
1264 return self.plugins[name]
1265
1266 def _unload_vim(self, target_id):
1267 """
1268 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1269 :param target_id: Contains type:_id; where type can be 'vim', ...
1270 :return: None.
1271 """
1272 try:
1273 self.db_vims.pop(target_id, None)
1274 self.my_vims.pop(target_id, None)
1275
1276 if target_id in self.vim_targets:
1277 self.vim_targets.remove(target_id)
1278
1279 self.logger.info("Unloaded {}".format(target_id))
1280 rmtree("{}:{}".format(target_id, self.worker_index))
1281 except FileNotFoundError:
1282 pass # this is raised by rmtree if folder does not exist
1283 except Exception as e:
1284 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1285
1286 def _check_vim(self, target_id):
1287 """
1288 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1289 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1290 :return: None.
1291 """
1292 target, _, _id = target_id.partition(":")
1293 now = time.time()
1294 update_dict = {}
1295 unset_dict = {}
1296 op_text = ""
1297 step = ""
1298 loaded = target_id in self.vim_targets
1299 target_database = (
1300 "vim_accounts"
1301 if target == "vim"
1302 else "wim_accounts"
1303 if target == "wim"
1304 else "sdns"
1305 )
1306
1307 try:
1308 step = "Getting {} from db".format(target_id)
1309 db_vim = self.db.get_one(target_database, {"_id": _id})
1310
1311 for op_index, operation in enumerate(
1312 db_vim["_admin"].get("operations", ())
1313 ):
1314 if operation["operationState"] != "PROCESSING":
1315 continue
1316
1317 locked_at = operation.get("locked_at")
1318
1319 if locked_at is not None and locked_at >= now - self.task_locked_time:
1320 # some other thread is doing this operation
1321 return
1322
1323 # lock
1324 op_text = "_admin.operations.{}.".format(op_index)
1325
1326 if not self.db.set_one(
1327 target_database,
1328 q_filter={
1329 "_id": _id,
1330 op_text + "operationState": "PROCESSING",
1331 op_text + "locked_at": locked_at,
1332 },
1333 update_dict={
1334 op_text + "locked_at": now,
1335 "admin.current_operation": op_index,
1336 },
1337 fail_on_empty=False,
1338 ):
1339 return
1340
1341 unset_dict[op_text + "locked_at"] = None
1342 unset_dict["current_operation"] = None
1343 step = "Loading " + target_id
1344 error_text = self._load_vim(target_id)
1345
1346 if not error_text:
1347 step = "Checking connectivity"
1348
1349 if target == "vim":
1350 self.my_vims[target_id].check_vim_connectivity()
1351 else:
1352 self.my_vims[target_id].check_credentials()
1353
1354 update_dict["_admin.operationalState"] = "ENABLED"
1355 update_dict["_admin.detailed-status"] = ""
1356 unset_dict[op_text + "detailed-status"] = None
1357 update_dict[op_text + "operationState"] = "COMPLETED"
1358
1359 return
1360
1361 except Exception as e:
1362 error_text = "{}: {}".format(step, e)
1363 self.logger.error("{} for {}: {}".format(step, target_id, e))
1364
1365 finally:
1366 if update_dict or unset_dict:
1367 if error_text:
1368 update_dict[op_text + "operationState"] = "FAILED"
1369 update_dict[op_text + "detailed-status"] = error_text
1370 unset_dict.pop(op_text + "detailed-status", None)
1371 update_dict["_admin.operationalState"] = "ERROR"
1372 update_dict["_admin.detailed-status"] = error_text
1373
1374 if op_text:
1375 update_dict[op_text + "statusEnteredTime"] = now
1376
1377 self.db.set_one(
1378 target_database,
1379 q_filter={"_id": _id},
1380 update_dict=update_dict,
1381 unset=unset_dict,
1382 fail_on_empty=False,
1383 )
1384
1385 if not loaded:
1386 self._unload_vim(target_id)
1387
1388 def _reload_vim(self, target_id):
1389 if target_id in self.vim_targets:
1390 self._load_vim(target_id)
1391 else:
1392 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1393 # just remove it to force load again next time it is needed
1394 self.db_vims.pop(target_id, None)
1395
1396 def _load_vim(self, target_id):
1397 """
1398 Load or reload a vim_account, sdn_controller or wim_account.
1399 Read content from database, load the plugin if not loaded.
1400 In case of error loading the plugin, it load a failing VIM_connector
1401 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1402 :param target_id: Contains type:_id; where type can be 'vim', ...
1403 :return: None if ok, descriptive text if error
1404 """
1405 target, _, _id = target_id.partition(":")
1406 target_database = (
1407 "vim_accounts"
1408 if target == "vim"
1409 else "wim_accounts"
1410 if target == "wim"
1411 else "sdns"
1412 )
1413 plugin_name = ""
1414 vim = None
1415
1416 try:
1417 step = "Getting {}={} from db".format(target, _id)
1418 # TODO process for wim, sdnc, ...
1419 vim = self.db.get_one(target_database, {"_id": _id})
1420
1421 # if deep_get(vim, "config", "sdn-controller"):
1422 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1423 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1424
1425 step = "Decrypting password"
1426 schema_version = vim.get("schema_version")
1427 self.db.encrypt_decrypt_fields(
1428 vim,
1429 "decrypt",
1430 fields=("password", "secret"),
1431 schema_version=schema_version,
1432 salt=_id,
1433 )
1434 self._process_vim_config(target_id, vim)
1435
1436 if target == "vim":
1437 plugin_name = "rovim_" + vim["vim_type"]
1438 step = "Loading plugin '{}'".format(plugin_name)
1439 vim_module_conn = self._load_plugin(plugin_name)
1440 step = "Loading {}'".format(target_id)
1441 self.my_vims[target_id] = vim_module_conn(
1442 uuid=vim["_id"],
1443 name=vim["name"],
1444 tenant_id=vim.get("vim_tenant_id"),
1445 tenant_name=vim.get("vim_tenant_name"),
1446 url=vim["vim_url"],
1447 url_admin=None,
1448 user=vim["vim_user"],
1449 passwd=vim["vim_password"],
1450 config=vim.get("config") or {},
1451 persistent_info={},
1452 )
1453 else: # sdn
1454 plugin_name = "rosdn_" + vim["type"]
1455 step = "Loading plugin '{}'".format(plugin_name)
1456 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1457 step = "Loading {}'".format(target_id)
1458 wim = deepcopy(vim)
1459 wim_config = wim.pop("config", {}) or {}
1460 wim["uuid"] = wim["_id"]
1461 wim["wim_url"] = wim["url"]
1462
1463 if wim.get("dpid"):
1464 wim_config["dpid"] = wim.pop("dpid")
1465
1466 if wim.get("switch_id"):
1467 wim_config["switch_id"] = wim.pop("switch_id")
1468
1469 # wim, wim_account, config
1470 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1471 self.db_vims[target_id] = vim
1472 self.error_status = None
1473
1474 self.logger.info(
1475 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1476 )
1477 except Exception as e:
1478 self.logger.error(
1479 "Cannot load {} plugin={}: {} {}".format(
1480 target_id, plugin_name, step, e
1481 )
1482 )
1483
1484 self.db_vims[target_id] = vim or {}
1485 self.db_vims[target_id] = FailingConnector(str(e))
1486 error_status = "{} Error: {}".format(step, e)
1487
1488 return error_status
1489 finally:
1490 if target_id not in self.vim_targets:
1491 self.vim_targets.append(target_id)
1492
1493 def _get_db_task(self):
1494 """
1495 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1496 :return: None
1497 """
1498 now = time.time()
1499
1500 if not self.time_last_task_processed:
1501 self.time_last_task_processed = now
1502
1503 try:
1504 while True:
1505 locked = self.db.set_one(
1506 "ro_tasks",
1507 q_filter={
1508 "target_id": self.vim_targets,
1509 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1510 "locked_at.lt": now - self.task_locked_time,
1511 "to_check_at.lt": self.time_last_task_processed,
1512 },
1513 update_dict={"locked_by": self.my_id, "locked_at": now},
1514 fail_on_empty=False,
1515 )
1516
1517 if locked:
1518 # read and return
1519 ro_task = self.db.get_one(
1520 "ro_tasks",
1521 q_filter={
1522 "target_id": self.vim_targets,
1523 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1524 "locked_at": now,
1525 },
1526 )
1527 return ro_task
1528
1529 if self.time_last_task_processed == now:
1530 self.time_last_task_processed = None
1531 return None
1532 else:
1533 self.time_last_task_processed = now
1534 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1535
1536 except DbException as e:
1537 self.logger.error("Database exception at _get_db_task: {}".format(e))
1538 except Exception as e:
1539 self.logger.critical(
1540 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1541 )
1542
1543 return None
1544
1545 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1546 """
1547 Determine if this task need to be done or superseded
1548 :return: None
1549 """
1550 my_task = ro_task["tasks"][task_index]
1551 task_id = my_task["task_id"]
1552 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1553 "created_items", False
1554 )
1555
1556 if my_task["status"] == "FAILED":
1557 return None, None # TODO need to be retry??
1558
1559 try:
1560 for index, task in enumerate(ro_task["tasks"]):
1561 if index == task_index or not task:
1562 continue # own task
1563
1564 if (
1565 my_task["target_record"] == task["target_record"]
1566 and task["action"] == "CREATE"
1567 ):
1568 # set to finished
1569 db_update["tasks.{}.status".format(index)] = task[
1570 "status"
1571 ] = "FINISHED"
1572 elif task["action"] == "CREATE" and task["status"] not in (
1573 "FINISHED",
1574 "SUPERSEDED",
1575 ):
1576 needed_delete = False
1577
1578 if needed_delete:
1579 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1580 else:
1581 return "SUPERSEDED", None
1582 except Exception as e:
1583 if not isinstance(e, NsWorkerException):
1584 self.logger.critical(
1585 "Unexpected exception at _delete_task task={}: {}".format(
1586 task_id, e
1587 ),
1588 exc_info=True,
1589 )
1590
1591 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1592
1593 def _create_task(self, ro_task, task_index, task_depends, db_update):
1594 """
1595 Determine if this task need to create something at VIM
1596 :return: None
1597 """
1598 my_task = ro_task["tasks"][task_index]
1599 task_id = my_task["task_id"]
1600 task_status = None
1601
1602 if my_task["status"] == "FAILED":
1603 return None, None # TODO need to be retry??
1604 elif my_task["status"] == "SCHEDULED":
1605 # check if already created by another task
1606 for index, task in enumerate(ro_task["tasks"]):
1607 if index == task_index or not task:
1608 continue # own task
1609
1610 if task["action"] == "CREATE" and task["status"] not in (
1611 "SCHEDULED",
1612 "FINISHED",
1613 "SUPERSEDED",
1614 ):
1615 return task["status"], "COPY_VIM_INFO"
1616
1617 try:
1618 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1619 ro_task, task_index, task_depends
1620 )
1621 # TODO update other CREATE tasks
1622 except Exception as e:
1623 if not isinstance(e, NsWorkerException):
1624 self.logger.error(
1625 "Error executing task={}: {}".format(task_id, e), exc_info=True
1626 )
1627
1628 task_status = "FAILED"
1629 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1630 # TODO update ro_vim_item_update
1631
1632 return task_status, ro_vim_item_update
1633 else:
1634 return None, None
1635
1636 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1637 """
1638 Look for dependency task
1639 :param task_id: Can be one of
1640 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1641 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1642 3. task.task_id: "<action_id>:number"
1643 :param ro_task:
1644 :param target_id:
1645 :return: database ro_task plus index of task
1646 """
1647 if (
1648 task_id.startswith("vim:")
1649 or task_id.startswith("sdn:")
1650 or task_id.startswith("wim:")
1651 ):
1652 target_id, _, task_id = task_id.partition(" ")
1653
1654 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1655 ro_task_dependency = self.db.get_one(
1656 "ro_tasks",
1657 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1658 fail_on_empty=False,
1659 )
1660
1661 if ro_task_dependency:
1662 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1663 if task["target_record_id"] == task_id:
1664 return ro_task_dependency, task_index
1665
1666 else:
1667 if ro_task:
1668 for task_index, task in enumerate(ro_task["tasks"]):
1669 if task and task["task_id"] == task_id:
1670 return ro_task, task_index
1671
1672 ro_task_dependency = self.db.get_one(
1673 "ro_tasks",
1674 q_filter={
1675 "tasks.ANYINDEX.task_id": task_id,
1676 "tasks.ANYINDEX.target_record.ne": None,
1677 },
1678 fail_on_empty=False,
1679 )
1680
1681 if ro_task_dependency:
1682 for task_index, task in ro_task_dependency["tasks"]:
1683 if task["task_id"] == task_id:
1684 return ro_task_dependency, task_index
1685 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1686
1687 def _process_pending_tasks(self, ro_task):
1688 ro_task_id = ro_task["_id"]
1689 now = time.time()
1690 # one day
1691 next_check_at = now + (24 * 60 * 60)
1692 db_ro_task_update = {}
1693
1694 def _update_refresh(new_status):
1695 # compute next_refresh
1696 nonlocal task
1697 nonlocal next_check_at
1698 nonlocal db_ro_task_update
1699 nonlocal ro_task
1700
1701 next_refresh = time.time()
1702
1703 if task["item"] in ("image", "flavor"):
1704 next_refresh += self.REFRESH_IMAGE
1705 elif new_status == "BUILD":
1706 next_refresh += self.REFRESH_BUILD
1707 elif new_status == "DONE":
1708 next_refresh += self.REFRESH_ACTIVE
1709 else:
1710 next_refresh += self.REFRESH_ERROR
1711
1712 next_check_at = min(next_check_at, next_refresh)
1713 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1714 ro_task["vim_info"]["refresh_at"] = next_refresh
1715
1716 try:
1717 # 0: get task_status_create
1718 lock_object = None
1719 task_status_create = None
1720 task_create = next(
1721 (
1722 t
1723 for t in ro_task["tasks"]
1724 if t
1725 and t["action"] == "CREATE"
1726 and t["status"] in ("BUILD", "DONE")
1727 ),
1728 None,
1729 )
1730
1731 if task_create:
1732 task_status_create = task_create["status"]
1733
1734 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1735 for task_action in ("DELETE", "CREATE", "EXEC"):
1736 db_vim_update = None
1737 new_status = None
1738
1739 for task_index, task in enumerate(ro_task["tasks"]):
1740 if not task:
1741 continue # task deleted
1742
1743 task_depends = {}
1744 target_update = None
1745
1746 if (
1747 (
1748 task_action in ("DELETE", "EXEC")
1749 and task["status"] not in ("SCHEDULED", "BUILD")
1750 )
1751 or task["action"] != task_action
1752 or (
1753 task_action == "CREATE"
1754 and task["status"] in ("FINISHED", "SUPERSEDED")
1755 )
1756 ):
1757 continue
1758
1759 task_path = "tasks.{}.status".format(task_index)
1760 try:
1761 db_vim_info_update = None
1762
1763 if task["status"] == "SCHEDULED":
1764 # check if tasks that this depends on have been completed
1765 dependency_not_completed = False
1766
1767 for dependency_task_id in task.get("depends_on") or ():
1768 (
1769 dependency_ro_task,
1770 dependency_task_index,
1771 ) = self._get_dependency(
1772 dependency_task_id, target_id=ro_task["target_id"]
1773 )
1774 dependency_task = dependency_ro_task["tasks"][
1775 dependency_task_index
1776 ]
1777
1778 if dependency_task["status"] == "SCHEDULED":
1779 dependency_not_completed = True
1780 next_check_at = min(
1781 next_check_at, dependency_ro_task["to_check_at"]
1782 )
1783 # must allow dependent task to be processed first
1784 # to do this set time after last_task_processed
1785 next_check_at = max(
1786 self.time_last_task_processed, next_check_at
1787 )
1788 break
1789 elif dependency_task["status"] == "FAILED":
1790 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
1791 task["action"],
1792 task["item"],
1793 dependency_task["action"],
1794 dependency_task["item"],
1795 dependency_task_id,
1796 dependency_ro_task["vim_info"].get(
1797 "vim_details"
1798 ),
1799 )
1800 self.logger.error(
1801 "task={} {}".format(task["task_id"], error_text)
1802 )
1803 raise NsWorkerException(error_text)
1804
1805 task_depends[dependency_task_id] = dependency_ro_task[
1806 "vim_info"
1807 ]["vim_id"]
1808 task_depends[
1809 "TASK-{}".format(dependency_task_id)
1810 ] = dependency_ro_task["vim_info"]["vim_id"]
1811
1812 if dependency_not_completed:
1813 # TODO set at vim_info.vim_details that it is waiting
1814 continue
1815
1816 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
1817 # the task of renew this locking. It will update database locket_at periodically
1818 if not lock_object:
1819 lock_object = LockRenew.add_lock_object(
1820 "ro_tasks", ro_task, self
1821 )
1822
1823 if task["action"] == "DELETE":
1824 (new_status, db_vim_info_update,) = self._delete_task(
1825 ro_task, task_index, task_depends, db_ro_task_update
1826 )
1827 new_status = (
1828 "FINISHED" if new_status == "DONE" else new_status
1829 )
1830 # ^with FINISHED instead of DONE it will not be refreshing
1831
1832 if new_status in ("FINISHED", "SUPERSEDED"):
1833 target_update = "DELETE"
1834 elif task["action"] == "EXEC":
1835 (
1836 new_status,
1837 db_vim_info_update,
1838 db_task_update,
1839 ) = self.item2class[task["item"]].exec(
1840 ro_task, task_index, task_depends
1841 )
1842 new_status = (
1843 "FINISHED" if new_status == "DONE" else new_status
1844 )
1845 # ^with FINISHED instead of DONE it will not be refreshing
1846
1847 if db_task_update:
1848 # load into database the modified db_task_update "retries" and "next_retry"
1849 if db_task_update.get("retries"):
1850 db_ro_task_update[
1851 "tasks.{}.retries".format(task_index)
1852 ] = db_task_update["retries"]
1853
1854 next_check_at = time.time() + db_task_update.get(
1855 "next_retry", 60
1856 )
1857 target_update = None
1858 elif task["action"] == "CREATE":
1859 if task["status"] == "SCHEDULED":
1860 if task_status_create:
1861 new_status = task_status_create
1862 target_update = "COPY_VIM_INFO"
1863 else:
1864 new_status, db_vim_info_update = self.item2class[
1865 task["item"]
1866 ].new(ro_task, task_index, task_depends)
1867 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
1868 _update_refresh(new_status)
1869 else:
1870 if (
1871 ro_task["vim_info"]["refresh_at"]
1872 and now > ro_task["vim_info"]["refresh_at"]
1873 ):
1874 new_status, db_vim_info_update = self.item2class[
1875 task["item"]
1876 ].refresh(ro_task)
1877 _update_refresh(new_status)
1878
1879 except Exception as e:
1880 new_status = "FAILED"
1881 db_vim_info_update = {
1882 "vim_status": "VIM_ERROR",
1883 "vim_details": str(e),
1884 }
1885
1886 if not isinstance(
1887 e, (NsWorkerException, vimconn.VimConnException)
1888 ):
1889 self.logger.error(
1890 "Unexpected exception at _delete_task task={}: {}".format(
1891 task["task_id"], e
1892 ),
1893 exc_info=True,
1894 )
1895
1896 try:
1897 if db_vim_info_update:
1898 db_vim_update = db_vim_info_update.copy()
1899 db_ro_task_update.update(
1900 {
1901 "vim_info." + k: v
1902 for k, v in db_vim_info_update.items()
1903 }
1904 )
1905 ro_task["vim_info"].update(db_vim_info_update)
1906
1907 if new_status:
1908 if task_action == "CREATE":
1909 task_status_create = new_status
1910 db_ro_task_update[task_path] = new_status
1911
1912 if target_update or db_vim_update:
1913 if target_update == "DELETE":
1914 self._update_target(task, None)
1915 elif target_update == "COPY_VIM_INFO":
1916 self._update_target(task, ro_task["vim_info"])
1917 else:
1918 self._update_target(task, db_vim_update)
1919
1920 except Exception as e:
1921 if (
1922 isinstance(e, DbException)
1923 and e.http_code == HTTPStatus.NOT_FOUND
1924 ):
1925 # if the vnfrs or nsrs has been removed from database, this task must be removed
1926 self.logger.debug(
1927 "marking to delete task={}".format(task["task_id"])
1928 )
1929 self.tasks_to_delete.append(task)
1930 else:
1931 self.logger.error(
1932 "Unexpected exception at _update_target task={}: {}".format(
1933 task["task_id"], e
1934 ),
1935 exc_info=True,
1936 )
1937
1938 locked_at = ro_task["locked_at"]
1939
1940 if lock_object:
1941 locked_at = [
1942 lock_object["locked_at"],
1943 lock_object["locked_at"] + self.task_locked_time,
1944 ]
1945 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
1946 # contain exactly locked_at + self.task_locked_time
1947 LockRenew.remove_lock_object(lock_object)
1948
1949 q_filter = {
1950 "_id": ro_task["_id"],
1951 "to_check_at": ro_task["to_check_at"],
1952 "locked_at": locked_at,
1953 }
1954 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
1955 # outside this task (by ro_nbi) do not update it
1956 db_ro_task_update["locked_by"] = None
1957 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
1958 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
1959 db_ro_task_update["modified_at"] = now
1960 db_ro_task_update["to_check_at"] = next_check_at
1961
1962 if not self.db.set_one(
1963 "ro_tasks",
1964 update_dict=db_ro_task_update,
1965 q_filter=q_filter,
1966 fail_on_empty=False,
1967 ):
1968 del db_ro_task_update["to_check_at"]
1969 del q_filter["to_check_at"]
1970 self.db.set_one(
1971 "ro_tasks",
1972 q_filter=q_filter,
1973 update_dict=db_ro_task_update,
1974 fail_on_empty=True,
1975 )
1976 except DbException as e:
1977 self.logger.error(
1978 "ro_task={} Error updating database {}".format(ro_task_id, e)
1979 )
1980 except Exception as e:
1981 self.logger.error(
1982 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
1983 )
1984
1985 def _update_target(self, task, ro_vim_item_update):
1986 table, _, temp = task["target_record"].partition(":")
1987 _id, _, path_vim_status = temp.partition(":")
1988 path_item = path_vim_status[: path_vim_status.rfind(".")]
1989 path_item = path_item[: path_item.rfind(".")]
1990 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
1991 # path_item: dot separated list targeting record information, e.g. "vdur.10"
1992
1993 if ro_vim_item_update:
1994 update_dict = {
1995 path_vim_status + "." + k: v
1996 for k, v in ro_vim_item_update.items()
1997 if k
1998 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
1999 }
2000
2001 if path_vim_status.startswith("vdur."):
2002 # for backward compatibility, add vdur.name apart from vdur.vim_name
2003 if ro_vim_item_update.get("vim_name"):
2004 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2005
2006 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2007 if ro_vim_item_update.get("vim_id"):
2008 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2009
2010 # update general status
2011 if ro_vim_item_update.get("vim_status"):
2012 update_dict[path_item + ".status"] = ro_vim_item_update[
2013 "vim_status"
2014 ]
2015
2016 if ro_vim_item_update.get("interfaces"):
2017 path_interfaces = path_item + ".interfaces"
2018
2019 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2020 if iface:
2021 update_dict.update(
2022 {
2023 path_interfaces + ".{}.".format(i) + k: v
2024 for k, v in iface.items()
2025 if k in ("vlan", "compute_node", "pci")
2026 }
2027 )
2028
2029 # put ip_address and mac_address with ip-address and mac-address
2030 if iface.get("ip_address"):
2031 update_dict[
2032 path_interfaces + ".{}.".format(i) + "ip-address"
2033 ] = iface["ip_address"]
2034
2035 if iface.get("mac_address"):
2036 update_dict[
2037 path_interfaces + ".{}.".format(i) + "mac-address"
2038 ] = iface["mac_address"]
2039
2040 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2041 update_dict["ip-address"] = iface.get("ip_address").split(
2042 ";"
2043 )[0]
2044
2045 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2046 update_dict[path_item + ".ip-address"] = iface.get(
2047 "ip_address"
2048 ).split(";")[0]
2049
2050 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2051 else:
2052 update_dict = {path_item + ".status": "DELETED"}
2053 self.db.set_one(
2054 table,
2055 q_filter={"_id": _id},
2056 update_dict=update_dict,
2057 unset={path_vim_status: None},
2058 )
2059
2060 def _process_delete_db_tasks(self):
2061 """
2062 Delete task from database because vnfrs or nsrs or both have been deleted
2063 :return: None. Uses and modify self.tasks_to_delete
2064 """
2065 while self.tasks_to_delete:
2066 task = self.tasks_to_delete[0]
2067 vnfrs_deleted = None
2068 nsr_id = task["nsr_id"]
2069
2070 if task["target_record"].startswith("vnfrs:"):
2071 # check if nsrs is present
2072 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2073 vnfrs_deleted = task["target_record"].split(":")[1]
2074
2075 try:
2076 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2077 except Exception as e:
2078 self.logger.error(
2079 "Error deleting task={}: {}".format(task["task_id"], e)
2080 )
2081 self.tasks_to_delete.pop(0)
2082
2083 @staticmethod
2084 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2085 """
2086 Static method because it is called from osm_ng_ro.ns
2087 :param db: instance of database to use
2088 :param nsr_id: affected nsrs id
2089 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2090 :return: None, exception is fails
2091 """
2092 retries = 5
2093 for retry in range(retries):
2094 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2095 now = time.time()
2096 conflict = False
2097
2098 for ro_task in ro_tasks:
2099 db_update = {}
2100 to_delete_ro_task = True
2101
2102 for index, task in enumerate(ro_task["tasks"]):
2103 if not task:
2104 pass
2105 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2106 vnfrs_deleted
2107 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2108 ):
2109 db_update["tasks.{}".format(index)] = None
2110 else:
2111 # used by other nsr, ro_task cannot be deleted
2112 to_delete_ro_task = False
2113
2114 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2115 if to_delete_ro_task:
2116 if not db.del_one(
2117 "ro_tasks",
2118 q_filter={
2119 "_id": ro_task["_id"],
2120 "modified_at": ro_task["modified_at"],
2121 },
2122 fail_on_empty=False,
2123 ):
2124 conflict = True
2125 elif db_update:
2126 db_update["modified_at"] = now
2127 if not db.set_one(
2128 "ro_tasks",
2129 q_filter={
2130 "_id": ro_task["_id"],
2131 "modified_at": ro_task["modified_at"],
2132 },
2133 update_dict=db_update,
2134 fail_on_empty=False,
2135 ):
2136 conflict = True
2137 if not conflict:
2138 return
2139 else:
2140 raise NsWorkerException("Exceeded {} retries".format(retries))
2141
2142 def run(self):
2143 # load database
2144 self.logger.info("Starting")
2145 while True:
2146 # step 1: get commands from queue
2147 try:
2148 if self.vim_targets:
2149 task = self.task_queue.get(block=False)
2150 else:
2151 if not self.idle:
2152 self.logger.debug("enters in idle state")
2153 self.idle = True
2154 task = self.task_queue.get(block=True)
2155 self.idle = False
2156
2157 if task[0] == "terminate":
2158 break
2159 elif task[0] == "load_vim":
2160 self.logger.info("order to load vim {}".format(task[1]))
2161 self._load_vim(task[1])
2162 elif task[0] == "unload_vim":
2163 self.logger.info("order to unload vim {}".format(task[1]))
2164 self._unload_vim(task[1])
2165 elif task[0] == "reload_vim":
2166 self._reload_vim(task[1])
2167 elif task[0] == "check_vim":
2168 self.logger.info("order to check vim {}".format(task[1]))
2169 self._check_vim(task[1])
2170 continue
2171 except Exception as e:
2172 if isinstance(e, queue.Empty):
2173 pass
2174 else:
2175 self.logger.critical(
2176 "Error processing task: {}".format(e), exc_info=True
2177 )
2178
2179 # step 2: process pending_tasks, delete not needed tasks
2180 try:
2181 if self.tasks_to_delete:
2182 self._process_delete_db_tasks()
2183 busy = False
2184 ro_task = self._get_db_task()
2185 if ro_task:
2186 self._process_pending_tasks(ro_task)
2187 busy = True
2188 if not busy:
2189 time.sleep(5)
2190 except Exception as e:
2191 self.logger.critical(
2192 "Unexpected exception at run: " + str(e), exc_info=True
2193 )
2194
2195 self.logger.info("Finishing")