Fix Bug 1781 RO does not create management networks where appropriate
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45 __author__ = "Alfonso Tierno"
46 __date__ = "$28-Sep-2017 12:07:15$"
47
48
49 def deep_get(target_dict, *args, **kwargs):
50 """
51 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
52 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
53 :param target_dict: dictionary to be read
54 :param args: list of keys to read from target_dict
55 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
56 :return: The wanted value if exist, None or default otherwise
57 """
58 for key in args:
59 if not isinstance(target_dict, dict) or key not in target_dict:
60 return kwargs.get("default")
61 target_dict = target_dict[key]
62 return target_dict
63
64
65 class NsWorkerException(Exception):
66 pass
67
68
69 class FailingConnector:
70 def __init__(self, error_msg):
71 self.error_msg = error_msg
72
73 for method in dir(vimconn.VimConnector):
74 if method[0] != "_":
75 setattr(
76 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
77 )
78
79 for method in dir(sdnconn.SdnConnectorBase):
80 if method[0] != "_":
81 setattr(
82 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
83 )
84
85
86 class NsWorkerExceptionNotFound(NsWorkerException):
87 pass
88
89
90 class VimInteractionBase:
91 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
92 It implements methods that does nothing and return ok"""
93
94 def __init__(self, db, my_vims, db_vims, logger):
95 self.db = db
96 self.logger = logger
97 self.my_vims = my_vims
98 self.db_vims = db_vims
99
100 def new(self, ro_task, task_index, task_depends):
101 return "BUILD", {}
102
103 def refresh(self, ro_task):
104 """skip calling VIM to get image, flavor status. Assumes ok"""
105 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
106 return "FAILED", {}
107
108 return "DONE", {}
109
110 def delete(self, ro_task, task_index):
111 """skip calling VIM to delete image. Assumes ok"""
112 return "DONE", {}
113
114 def exec(self, ro_task, task_index, task_depends):
115 return "DONE", None, None
116
117
118 class VimInteractionNet(VimInteractionBase):
119 def new(self, ro_task, task_index, task_depends):
120 vim_net_id = None
121 task = ro_task["tasks"][task_index]
122 task_id = task["task_id"]
123 created = False
124 created_items = {}
125 target_vim = self.my_vims[ro_task["target_id"]]
126 mgmtnet = False
127 mgmtnet_defined_in_vim = False
128
129 try:
130 # FIND
131 if task.get("find_params"):
132 # if management, get configuration of VIM
133 if task["find_params"].get("filter_dict"):
134 vim_filter = task["find_params"]["filter_dict"]
135 # management network
136 elif task["find_params"].get("mgmt"):
137 mgmtnet = True
138 if deep_get(
139 self.db_vims[ro_task["target_id"]],
140 "config",
141 "management_network_id",
142 ):
143 mgmtnet_defined_in_vim = True
144 vim_filter = {
145 "id": self.db_vims[ro_task["target_id"]]["config"][
146 "management_network_id"
147 ]
148 }
149 elif deep_get(
150 self.db_vims[ro_task["target_id"]],
151 "config",
152 "management_network_name",
153 ):
154 mgmtnet_defined_in_vim = True
155 vim_filter = {
156 "name": self.db_vims[ro_task["target_id"]]["config"][
157 "management_network_name"
158 ]
159 }
160 else:
161 vim_filter = {"name": task["find_params"]["name"]}
162 else:
163 raise NsWorkerExceptionNotFound(
164 "Invalid find_params for new_net {}".format(task["find_params"])
165 )
166
167 vim_nets = target_vim.get_network_list(vim_filter)
168 if not vim_nets and not task.get("params"):
169 # If there is mgmt-network in the descriptor,
170 # there is no mapping of that network to a VIM network in the descriptor,
171 # also there is no mapping in the "--config" parameter or at VIM creation;
172 # that mgmt-network will be created.
173 if mgmtnet and not mgmtnet_defined_in_vim:
174 net_name = (
175 vim_filter.get("name")
176 if vim_filter.get("name")
177 else vim_filter.get("id")[:16]
178 )
179 vim_net_id, created_items = target_vim.new_network(
180 net_name, None
181 )
182 self.logger.debug(
183 "Created mgmt network vim_net_id: {}".format(vim_net_id)
184 )
185 created = True
186 else:
187 raise NsWorkerExceptionNotFound(
188 "Network not found with this criteria: '{}'".format(
189 task.get("find_params")
190 )
191 )
192 elif len(vim_nets) > 1:
193 raise NsWorkerException(
194 "More than one network found with this criteria: '{}'".format(
195 task["find_params"]
196 )
197 )
198
199 if vim_nets:
200 vim_net_id = vim_nets[0]["id"]
201 else:
202 # CREATE
203 params = task["params"]
204 vim_net_id, created_items = target_vim.new_network(**params)
205 created = True
206
207 ro_vim_item_update = {
208 "vim_id": vim_net_id,
209 "vim_status": "BUILD",
210 "created": created,
211 "created_items": created_items,
212 "vim_details": None,
213 }
214 self.logger.debug(
215 "task={} {} new-net={} created={}".format(
216 task_id, ro_task["target_id"], vim_net_id, created
217 )
218 )
219
220 return "BUILD", ro_vim_item_update
221 except (vimconn.VimConnException, NsWorkerException) as e:
222 self.logger.error(
223 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
224 )
225 ro_vim_item_update = {
226 "vim_status": "VIM_ERROR",
227 "created": created,
228 "vim_details": str(e),
229 }
230
231 return "FAILED", ro_vim_item_update
232
233 def refresh(self, ro_task):
234 """Call VIM to get network status"""
235 ro_task_id = ro_task["_id"]
236 target_vim = self.my_vims[ro_task["target_id"]]
237 vim_id = ro_task["vim_info"]["vim_id"]
238 net_to_refresh_list = [vim_id]
239
240 try:
241 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
242 vim_info = vim_dict[vim_id]
243
244 if vim_info["status"] == "ACTIVE":
245 task_status = "DONE"
246 elif vim_info["status"] == "BUILD":
247 task_status = "BUILD"
248 else:
249 task_status = "FAILED"
250 except vimconn.VimConnException as e:
251 # Mark all tasks at VIM_ERROR status
252 self.logger.error(
253 "ro_task={} vim={} get-net={}: {}".format(
254 ro_task_id, ro_task["target_id"], vim_id, e
255 )
256 )
257 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
258 task_status = "FAILED"
259
260 ro_vim_item_update = {}
261 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
262 ro_vim_item_update["vim_status"] = vim_info["status"]
263
264 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
265 ro_vim_item_update["vim_name"] = vim_info.get("name")
266
267 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
268 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
269 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
270 elif vim_info["status"] == "DELETED":
271 ro_vim_item_update["vim_id"] = None
272 ro_vim_item_update["vim_details"] = "Deleted externally"
273 else:
274 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
275 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
276
277 if ro_vim_item_update:
278 self.logger.debug(
279 "ro_task={} {} get-net={}: status={} {}".format(
280 ro_task_id,
281 ro_task["target_id"],
282 vim_id,
283 ro_vim_item_update.get("vim_status"),
284 ro_vim_item_update.get("vim_details")
285 if ro_vim_item_update.get("vim_status") != "ACTIVE"
286 else "",
287 )
288 )
289
290 return task_status, ro_vim_item_update
291
292 def delete(self, ro_task, task_index):
293 task = ro_task["tasks"][task_index]
294 task_id = task["task_id"]
295 net_vim_id = ro_task["vim_info"]["vim_id"]
296 ro_vim_item_update_ok = {
297 "vim_status": "DELETED",
298 "created": False,
299 "vim_details": "DELETED",
300 "vim_id": None,
301 }
302
303 try:
304 if net_vim_id or ro_task["vim_info"]["created_items"]:
305 target_vim = self.my_vims[ro_task["target_id"]]
306 target_vim.delete_network(
307 net_vim_id, ro_task["vim_info"]["created_items"]
308 )
309 except vimconn.VimConnNotFoundException:
310 ro_vim_item_update_ok["vim_details"] = "already deleted"
311 except vimconn.VimConnException as e:
312 self.logger.error(
313 "ro_task={} vim={} del-net={}: {}".format(
314 ro_task["_id"], ro_task["target_id"], net_vim_id, e
315 )
316 )
317 ro_vim_item_update = {
318 "vim_status": "VIM_ERROR",
319 "vim_details": "Error while deleting: {}".format(e),
320 }
321
322 return "FAILED", ro_vim_item_update
323
324 self.logger.debug(
325 "task={} {} del-net={} {}".format(
326 task_id,
327 ro_task["target_id"],
328 net_vim_id,
329 ro_vim_item_update_ok.get("vim_details", ""),
330 )
331 )
332
333 return "DONE", ro_vim_item_update_ok
334
335
336 class VimInteractionVdu(VimInteractionBase):
337 max_retries_inject_ssh_key = 20 # 20 times
338 time_retries_inject_ssh_key = 30 # wevery 30 seconds
339
340 def new(self, ro_task, task_index, task_depends):
341 task = ro_task["tasks"][task_index]
342 task_id = task["task_id"]
343 created = False
344 created_items = {}
345 target_vim = self.my_vims[ro_task["target_id"]]
346
347 try:
348 created = True
349 params = task["params"]
350 params_copy = deepcopy(params)
351 net_list = params_copy["net_list"]
352
353 for net in net_list:
354 # change task_id into network_id
355 if "net_id" in net and net["net_id"].startswith("TASK-"):
356 network_id = task_depends[net["net_id"]]
357
358 if not network_id:
359 raise NsWorkerException(
360 "Cannot create VM because depends on a network not created or found "
361 "for {}".format(net["net_id"])
362 )
363
364 net["net_id"] = network_id
365
366 if params_copy["image_id"].startswith("TASK-"):
367 params_copy["image_id"] = task_depends[params_copy["image_id"]]
368
369 if params_copy["flavor_id"].startswith("TASK-"):
370 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
371
372 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
373 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
374
375 ro_vim_item_update = {
376 "vim_id": vim_vm_id,
377 "vim_status": "BUILD",
378 "created": created,
379 "created_items": created_items,
380 "vim_details": None,
381 "interfaces_vim_ids": interfaces,
382 "interfaces": [],
383 }
384 self.logger.debug(
385 "task={} {} new-vm={} created={}".format(
386 task_id, ro_task["target_id"], vim_vm_id, created
387 )
388 )
389
390 return "BUILD", ro_vim_item_update
391 except (vimconn.VimConnException, NsWorkerException) as e:
392 self.logger.error(
393 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
394 )
395 ro_vim_item_update = {
396 "vim_status": "VIM_ERROR",
397 "created": created,
398 "vim_details": str(e),
399 }
400
401 return "FAILED", ro_vim_item_update
402
403 def delete(self, ro_task, task_index):
404 task = ro_task["tasks"][task_index]
405 task_id = task["task_id"]
406 vm_vim_id = ro_task["vim_info"]["vim_id"]
407 ro_vim_item_update_ok = {
408 "vim_status": "DELETED",
409 "created": False,
410 "vim_details": "DELETED",
411 "vim_id": None,
412 }
413
414 try:
415 if vm_vim_id or ro_task["vim_info"]["created_items"]:
416 target_vim = self.my_vims[ro_task["target_id"]]
417 target_vim.delete_vminstance(
418 vm_vim_id, ro_task["vim_info"]["created_items"]
419 )
420 except vimconn.VimConnNotFoundException:
421 ro_vim_item_update_ok["vim_details"] = "already deleted"
422 except vimconn.VimConnException as e:
423 self.logger.error(
424 "ro_task={} vim={} del-vm={}: {}".format(
425 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
426 )
427 )
428 ro_vim_item_update = {
429 "vim_status": "VIM_ERROR",
430 "vim_details": "Error while deleting: {}".format(e),
431 }
432
433 return "FAILED", ro_vim_item_update
434
435 self.logger.debug(
436 "task={} {} del-vm={} {}".format(
437 task_id,
438 ro_task["target_id"],
439 vm_vim_id,
440 ro_vim_item_update_ok.get("vim_details", ""),
441 )
442 )
443
444 return "DONE", ro_vim_item_update_ok
445
446 def refresh(self, ro_task):
447 """Call VIM to get vm status"""
448 ro_task_id = ro_task["_id"]
449 target_vim = self.my_vims[ro_task["target_id"]]
450 vim_id = ro_task["vim_info"]["vim_id"]
451
452 if not vim_id:
453 return None, None
454
455 vm_to_refresh_list = [vim_id]
456 try:
457 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
458 vim_info = vim_dict[vim_id]
459
460 if vim_info["status"] == "ACTIVE":
461 task_status = "DONE"
462 elif vim_info["status"] == "BUILD":
463 task_status = "BUILD"
464 else:
465 task_status = "FAILED"
466
467 # try to load and parse vim_information
468 try:
469 vim_info_info = yaml.safe_load(vim_info["vim_info"])
470 if vim_info_info.get("name"):
471 vim_info["name"] = vim_info_info["name"]
472 except Exception:
473 pass
474 except vimconn.VimConnException as e:
475 # Mark all tasks at VIM_ERROR status
476 self.logger.error(
477 "ro_task={} vim={} get-vm={}: {}".format(
478 ro_task_id, ro_task["target_id"], vim_id, e
479 )
480 )
481 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
482 task_status = "FAILED"
483
484 ro_vim_item_update = {}
485
486 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
487 vim_interfaces = []
488 if vim_info.get("interfaces"):
489 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
490 iface = next(
491 (
492 iface
493 for iface in vim_info["interfaces"]
494 if vim_iface_id == iface["vim_interface_id"]
495 ),
496 None,
497 )
498 # if iface:
499 # iface.pop("vim_info", None)
500 vim_interfaces.append(iface)
501
502 task_create = next(
503 t
504 for t in ro_task["tasks"]
505 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
506 )
507 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
508 vim_interfaces[task_create["mgmt_vnf_interface"]][
509 "mgmt_vnf_interface"
510 ] = True
511
512 mgmt_vdu_iface = task_create.get(
513 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
514 )
515 if vim_interfaces:
516 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
517
518 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
519 ro_vim_item_update["interfaces"] = vim_interfaces
520
521 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
522 ro_vim_item_update["vim_status"] = vim_info["status"]
523
524 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
525 ro_vim_item_update["vim_name"] = vim_info.get("name")
526
527 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
528 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
529 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
530 elif vim_info["status"] == "DELETED":
531 ro_vim_item_update["vim_id"] = None
532 ro_vim_item_update["vim_details"] = "Deleted externally"
533 else:
534 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
535 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
536
537 if ro_vim_item_update:
538 self.logger.debug(
539 "ro_task={} {} get-vm={}: status={} {}".format(
540 ro_task_id,
541 ro_task["target_id"],
542 vim_id,
543 ro_vim_item_update.get("vim_status"),
544 ro_vim_item_update.get("vim_details")
545 if ro_vim_item_update.get("vim_status") != "ACTIVE"
546 else "",
547 )
548 )
549
550 return task_status, ro_vim_item_update
551
552 def exec(self, ro_task, task_index, task_depends):
553 task = ro_task["tasks"][task_index]
554 task_id = task["task_id"]
555 target_vim = self.my_vims[ro_task["target_id"]]
556 db_task_update = {"retries": 0}
557 retries = task.get("retries", 0)
558
559 try:
560 params = task["params"]
561 params_copy = deepcopy(params)
562 params_copy["ro_key"] = self.db.decrypt(
563 params_copy.pop("private_key"),
564 params_copy.pop("schema_version"),
565 params_copy.pop("salt"),
566 )
567 params_copy["ip_addr"] = params_copy.pop("ip_address")
568 target_vim.inject_user_key(**params_copy)
569 self.logger.debug(
570 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
571 )
572
573 return (
574 "DONE",
575 None,
576 db_task_update,
577 ) # params_copy["key"]
578 except (vimconn.VimConnException, NsWorkerException) as e:
579 retries += 1
580
581 if retries < self.max_retries_inject_ssh_key:
582 return (
583 "BUILD",
584 None,
585 {
586 "retries": retries,
587 "next_retry": self.time_retries_inject_ssh_key,
588 },
589 )
590
591 self.logger.error(
592 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
593 )
594 ro_vim_item_update = {"vim_details": str(e)}
595
596 return "FAILED", ro_vim_item_update, db_task_update
597
598
599 class VimInteractionImage(VimInteractionBase):
600 def new(self, ro_task, task_index, task_depends):
601 task = ro_task["tasks"][task_index]
602 task_id = task["task_id"]
603 created = False
604 created_items = {}
605 target_vim = self.my_vims[ro_task["target_id"]]
606
607 try:
608 # FIND
609 if task.get("find_params"):
610 vim_images = target_vim.get_image_list(**task["find_params"])
611
612 if not vim_images:
613 raise NsWorkerExceptionNotFound(
614 "Image not found with this criteria: '{}'".format(
615 task["find_params"]
616 )
617 )
618 elif len(vim_images) > 1:
619 raise NsWorkerException(
620 "More than one image found with this criteria: '{}'".format(
621 task["find_params"]
622 )
623 )
624 else:
625 vim_image_id = vim_images[0]["id"]
626
627 ro_vim_item_update = {
628 "vim_id": vim_image_id,
629 "vim_status": "DONE",
630 "created": created,
631 "created_items": created_items,
632 "vim_details": None,
633 }
634 self.logger.debug(
635 "task={} {} new-image={} created={}".format(
636 task_id, ro_task["target_id"], vim_image_id, created
637 )
638 )
639
640 return "DONE", ro_vim_item_update
641 except (NsWorkerException, vimconn.VimConnException) as e:
642 self.logger.error(
643 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
644 )
645 ro_vim_item_update = {
646 "vim_status": "VIM_ERROR",
647 "created": created,
648 "vim_details": str(e),
649 }
650
651 return "FAILED", ro_vim_item_update
652
653
654 class VimInteractionFlavor(VimInteractionBase):
655 def delete(self, ro_task, task_index):
656 task = ro_task["tasks"][task_index]
657 task_id = task["task_id"]
658 flavor_vim_id = ro_task["vim_info"]["vim_id"]
659 ro_vim_item_update_ok = {
660 "vim_status": "DELETED",
661 "created": False,
662 "vim_details": "DELETED",
663 "vim_id": None,
664 }
665
666 try:
667 if flavor_vim_id:
668 target_vim = self.my_vims[ro_task["target_id"]]
669 target_vim.delete_flavor(flavor_vim_id)
670 except vimconn.VimConnNotFoundException:
671 ro_vim_item_update_ok["vim_details"] = "already deleted"
672 except vimconn.VimConnException as e:
673 self.logger.error(
674 "ro_task={} vim={} del-flavor={}: {}".format(
675 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
676 )
677 )
678 ro_vim_item_update = {
679 "vim_status": "VIM_ERROR",
680 "vim_details": "Error while deleting: {}".format(e),
681 }
682
683 return "FAILED", ro_vim_item_update
684
685 self.logger.debug(
686 "task={} {} del-flavor={} {}".format(
687 task_id,
688 ro_task["target_id"],
689 flavor_vim_id,
690 ro_vim_item_update_ok.get("vim_details", ""),
691 )
692 )
693
694 return "DONE", ro_vim_item_update_ok
695
696 def new(self, ro_task, task_index, task_depends):
697 task = ro_task["tasks"][task_index]
698 task_id = task["task_id"]
699 created = False
700 created_items = {}
701 target_vim = self.my_vims[ro_task["target_id"]]
702
703 try:
704 # FIND
705 vim_flavor_id = None
706
707 if task.get("find_params"):
708 try:
709 flavor_data = task["find_params"]["flavor_data"]
710 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
711 except vimconn.VimConnNotFoundException:
712 pass
713
714 if not vim_flavor_id and task.get("params"):
715 # CREATE
716 flavor_data = task["params"]["flavor_data"]
717 vim_flavor_id = target_vim.new_flavor(flavor_data)
718 created = True
719
720 ro_vim_item_update = {
721 "vim_id": vim_flavor_id,
722 "vim_status": "DONE",
723 "created": created,
724 "created_items": created_items,
725 "vim_details": None,
726 }
727 self.logger.debug(
728 "task={} {} new-flavor={} created={}".format(
729 task_id, ro_task["target_id"], vim_flavor_id, created
730 )
731 )
732
733 return "DONE", ro_vim_item_update
734 except (vimconn.VimConnException, NsWorkerException) as e:
735 self.logger.error(
736 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
737 )
738 ro_vim_item_update = {
739 "vim_status": "VIM_ERROR",
740 "created": created,
741 "vim_details": str(e),
742 }
743
744 return "FAILED", ro_vim_item_update
745
746
747 class VimInteractionSdnNet(VimInteractionBase):
748 @staticmethod
749 def _match_pci(port_pci, mapping):
750 """
751 Check if port_pci matches with mapping
752 mapping can have brackets to indicate that several chars are accepted. e.g
753 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
754 :param port_pci: text
755 :param mapping: text, can contain brackets to indicate several chars are available
756 :return: True if matches, False otherwise
757 """
758 if not port_pci or not mapping:
759 return False
760 if port_pci == mapping:
761 return True
762
763 mapping_index = 0
764 pci_index = 0
765 while True:
766 bracket_start = mapping.find("[", mapping_index)
767
768 if bracket_start == -1:
769 break
770
771 bracket_end = mapping.find("]", bracket_start)
772 if bracket_end == -1:
773 break
774
775 length = bracket_start - mapping_index
776 if (
777 length
778 and port_pci[pci_index : pci_index + length]
779 != mapping[mapping_index:bracket_start]
780 ):
781 return False
782
783 if (
784 port_pci[pci_index + length]
785 not in mapping[bracket_start + 1 : bracket_end]
786 ):
787 return False
788
789 pci_index += length + 1
790 mapping_index = bracket_end + 1
791
792 if port_pci[pci_index:] != mapping[mapping_index:]:
793 return False
794
795 return True
796
797 def _get_interfaces(self, vlds_to_connect, vim_account_id):
798 """
799 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
800 :param vim_account_id:
801 :return:
802 """
803 interfaces = []
804
805 for vld in vlds_to_connect:
806 table, _, db_id = vld.partition(":")
807 db_id, _, vld = db_id.partition(":")
808 _, _, vld_id = vld.partition(".")
809
810 if table == "vnfrs":
811 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
812 iface_key = "vnf-vld-id"
813 else: # table == "nsrs"
814 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
815 iface_key = "ns-vld-id"
816
817 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
818
819 for db_vnfr in db_vnfrs:
820 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
821 for iface_index, interface in enumerate(vdur["interfaces"]):
822 if interface.get(iface_key) == vld_id and interface.get(
823 "type"
824 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
825 # only SR-IOV o PT
826 interface_ = interface.copy()
827 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
828 db_vnfr["_id"], vdu_index, iface_index
829 )
830
831 if vdur.get("status") == "ERROR":
832 interface_["status"] = "ERROR"
833
834 interfaces.append(interface_)
835
836 return interfaces
837
838 def refresh(self, ro_task):
839 # look for task create
840 task_create_index, _ = next(
841 i_t
842 for i_t in enumerate(ro_task["tasks"])
843 if i_t[1]
844 and i_t[1]["action"] == "CREATE"
845 and i_t[1]["status"] != "FINISHED"
846 )
847
848 return self.new(ro_task, task_create_index, None)
849
850 def new(self, ro_task, task_index, task_depends):
851
852 task = ro_task["tasks"][task_index]
853 task_id = task["task_id"]
854 target_vim = self.my_vims[ro_task["target_id"]]
855
856 sdn_net_id = ro_task["vim_info"]["vim_id"]
857
858 created_items = ro_task["vim_info"].get("created_items")
859 connected_ports = ro_task["vim_info"].get("connected_ports", [])
860 new_connected_ports = []
861 last_update = ro_task["vim_info"].get("last_update", 0)
862 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
863 error_list = []
864 created = ro_task["vim_info"].get("created", False)
865
866 try:
867 # CREATE
868 params = task["params"]
869 vlds_to_connect = params["vlds"]
870 associated_vim = params["target_vim"]
871 # external additional ports
872 additional_ports = params.get("sdn-ports") or ()
873 _, _, vim_account_id = associated_vim.partition(":")
874
875 if associated_vim:
876 # get associated VIM
877 if associated_vim not in self.db_vims:
878 self.db_vims[associated_vim] = self.db.get_one(
879 "vim_accounts", {"_id": vim_account_id}
880 )
881
882 db_vim = self.db_vims[associated_vim]
883
884 # look for ports to connect
885 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
886 # print(ports)
887
888 sdn_ports = []
889 pending_ports = error_ports = 0
890 vlan_used = None
891 sdn_need_update = False
892
893 for port in ports:
894 vlan_used = port.get("vlan") or vlan_used
895
896 # TODO. Do not connect if already done
897 if not port.get("compute_node") or not port.get("pci"):
898 if port.get("status") == "ERROR":
899 error_ports += 1
900 else:
901 pending_ports += 1
902 continue
903
904 pmap = None
905 compute_node_mappings = next(
906 (
907 c
908 for c in db_vim["config"].get("sdn-port-mapping", ())
909 if c and c["compute_node"] == port["compute_node"]
910 ),
911 None,
912 )
913
914 if compute_node_mappings:
915 # process port_mapping pci of type 0000:af:1[01].[1357]
916 pmap = next(
917 (
918 p
919 for p in compute_node_mappings["ports"]
920 if self._match_pci(port["pci"], p.get("pci"))
921 ),
922 None,
923 )
924
925 if not pmap:
926 if not db_vim["config"].get("mapping_not_needed"):
927 error_list.append(
928 "Port mapping not found for compute_node={} pci={}".format(
929 port["compute_node"], port["pci"]
930 )
931 )
932 continue
933
934 pmap = {}
935
936 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
937 new_port = {
938 "service_endpoint_id": pmap.get("service_endpoint_id")
939 or service_endpoint_id,
940 "service_endpoint_encapsulation_type": "dot1q"
941 if port["type"] == "SR-IOV"
942 else None,
943 "service_endpoint_encapsulation_info": {
944 "vlan": port.get("vlan"),
945 "mac": port.get("mac-address"),
946 "device_id": pmap.get("device_id") or port["compute_node"],
947 "device_interface_id": pmap.get("device_interface_id")
948 or port["pci"],
949 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
950 "switch_port": pmap.get("switch_port"),
951 "service_mapping_info": pmap.get("service_mapping_info"),
952 },
953 }
954
955 # TODO
956 # if port["modified_at"] > last_update:
957 # sdn_need_update = True
958 new_connected_ports.append(port["id"]) # TODO
959 sdn_ports.append(new_port)
960
961 if error_ports:
962 error_list.append(
963 "{} interfaces have not been created as VDU is on ERROR status".format(
964 error_ports
965 )
966 )
967
968 # connect external ports
969 for index, additional_port in enumerate(additional_ports):
970 additional_port_id = additional_port.get(
971 "service_endpoint_id"
972 ) or "external-{}".format(index)
973 sdn_ports.append(
974 {
975 "service_endpoint_id": additional_port_id,
976 "service_endpoint_encapsulation_type": additional_port.get(
977 "service_endpoint_encapsulation_type", "dot1q"
978 ),
979 "service_endpoint_encapsulation_info": {
980 "vlan": additional_port.get("vlan") or vlan_used,
981 "mac": additional_port.get("mac_address"),
982 "device_id": additional_port.get("device_id"),
983 "device_interface_id": additional_port.get(
984 "device_interface_id"
985 ),
986 "switch_dpid": additional_port.get("switch_dpid")
987 or additional_port.get("switch_id"),
988 "switch_port": additional_port.get("switch_port"),
989 "service_mapping_info": additional_port.get(
990 "service_mapping_info"
991 ),
992 },
993 }
994 )
995 new_connected_ports.append(additional_port_id)
996 sdn_info = ""
997
998 # if there are more ports to connect or they have been modified, call create/update
999 if error_list:
1000 sdn_status = "ERROR"
1001 sdn_info = "; ".join(error_list)
1002 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1003 last_update = time.time()
1004
1005 if not sdn_net_id:
1006 if len(sdn_ports) < 2:
1007 sdn_status = "ACTIVE"
1008
1009 if not pending_ports:
1010 self.logger.debug(
1011 "task={} {} new-sdn-net done, less than 2 ports".format(
1012 task_id, ro_task["target_id"]
1013 )
1014 )
1015 else:
1016 net_type = params.get("type") or "ELAN"
1017 (
1018 sdn_net_id,
1019 created_items,
1020 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1021 created = True
1022 self.logger.debug(
1023 "task={} {} new-sdn-net={} created={}".format(
1024 task_id, ro_task["target_id"], sdn_net_id, created
1025 )
1026 )
1027 else:
1028 created_items = target_vim.edit_connectivity_service(
1029 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1030 )
1031 created = True
1032 self.logger.debug(
1033 "task={} {} update-sdn-net={} created={}".format(
1034 task_id, ro_task["target_id"], sdn_net_id, created
1035 )
1036 )
1037
1038 connected_ports = new_connected_ports
1039 elif sdn_net_id:
1040 wim_status_dict = target_vim.get_connectivity_service_status(
1041 sdn_net_id, conn_info=created_items
1042 )
1043 sdn_status = wim_status_dict["sdn_status"]
1044
1045 if wim_status_dict.get("sdn_info"):
1046 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1047
1048 if wim_status_dict.get("error_msg"):
1049 sdn_info = wim_status_dict.get("error_msg") or ""
1050
1051 if pending_ports:
1052 if sdn_status != "ERROR":
1053 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1054 len(ports) - pending_ports, len(ports)
1055 )
1056
1057 if sdn_status == "ACTIVE":
1058 sdn_status = "BUILD"
1059
1060 ro_vim_item_update = {
1061 "vim_id": sdn_net_id,
1062 "vim_status": sdn_status,
1063 "created": created,
1064 "created_items": created_items,
1065 "connected_ports": connected_ports,
1066 "vim_details": sdn_info,
1067 "last_update": last_update,
1068 }
1069
1070 return sdn_status, ro_vim_item_update
1071 except Exception as e:
1072 self.logger.error(
1073 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1074 exc_info=not isinstance(
1075 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1076 ),
1077 )
1078 ro_vim_item_update = {
1079 "vim_status": "VIM_ERROR",
1080 "created": created,
1081 "vim_details": str(e),
1082 }
1083
1084 return "FAILED", ro_vim_item_update
1085
1086 def delete(self, ro_task, task_index):
1087 task = ro_task["tasks"][task_index]
1088 task_id = task["task_id"]
1089 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1090 ro_vim_item_update_ok = {
1091 "vim_status": "DELETED",
1092 "created": False,
1093 "vim_details": "DELETED",
1094 "vim_id": None,
1095 }
1096
1097 try:
1098 if sdn_vim_id:
1099 target_vim = self.my_vims[ro_task["target_id"]]
1100 target_vim.delete_connectivity_service(
1101 sdn_vim_id, ro_task["vim_info"].get("created_items")
1102 )
1103
1104 except Exception as e:
1105 if (
1106 isinstance(e, sdnconn.SdnConnectorError)
1107 and e.http_code == HTTPStatus.NOT_FOUND.value
1108 ):
1109 ro_vim_item_update_ok["vim_details"] = "already deleted"
1110 else:
1111 self.logger.error(
1112 "ro_task={} vim={} del-sdn-net={}: {}".format(
1113 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1114 ),
1115 exc_info=not isinstance(
1116 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1117 ),
1118 )
1119 ro_vim_item_update = {
1120 "vim_status": "VIM_ERROR",
1121 "vim_details": "Error while deleting: {}".format(e),
1122 }
1123
1124 return "FAILED", ro_vim_item_update
1125
1126 self.logger.debug(
1127 "task={} {} del-sdn-net={} {}".format(
1128 task_id,
1129 ro_task["target_id"],
1130 sdn_vim_id,
1131 ro_vim_item_update_ok.get("vim_details", ""),
1132 )
1133 )
1134
1135 return "DONE", ro_vim_item_update_ok
1136
1137
1138 class NsWorker(threading.Thread):
1139 REFRESH_BUILD = 5 # 5 seconds
1140 REFRESH_ACTIVE = 60 # 1 minute
1141 REFRESH_ERROR = 600
1142 REFRESH_IMAGE = 3600 * 10
1143 REFRESH_DELETE = 3600 * 10
1144 QUEUE_SIZE = 100
1145 terminate = False
1146
1147 def __init__(self, worker_index, config, plugins, db):
1148 """
1149
1150 :param worker_index: thread index
1151 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1152 :param plugins: global shared dict with the loaded plugins
1153 :param db: database class instance to use
1154 """
1155 threading.Thread.__init__(self)
1156 self.config = config
1157 self.plugins = plugins
1158 self.plugin_name = "unknown"
1159 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1160 self.worker_index = worker_index
1161 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1162 # targetvim: vimplugin class
1163 self.my_vims = {}
1164 # targetvim: vim information from database
1165 self.db_vims = {}
1166 # targetvim list
1167 self.vim_targets = []
1168 self.my_id = config["process_id"] + ":" + str(worker_index)
1169 self.db = db
1170 self.item2class = {
1171 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1172 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1173 "image": VimInteractionImage(
1174 self.db, self.my_vims, self.db_vims, self.logger
1175 ),
1176 "flavor": VimInteractionFlavor(
1177 self.db, self.my_vims, self.db_vims, self.logger
1178 ),
1179 "sdn_net": VimInteractionSdnNet(
1180 self.db, self.my_vims, self.db_vims, self.logger
1181 ),
1182 }
1183 self.time_last_task_processed = None
1184 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1185 self.tasks_to_delete = []
1186 # it is idle when there are not vim_targets associated
1187 self.idle = True
1188 self.task_locked_time = config["global"]["task_locked_time"]
1189
1190 def insert_task(self, task):
1191 try:
1192 self.task_queue.put(task, False)
1193 return None
1194 except queue.Full:
1195 raise NsWorkerException("timeout inserting a task")
1196
1197 def terminate(self):
1198 self.insert_task("exit")
1199
1200 def del_task(self, task):
1201 with self.task_lock:
1202 if task["status"] == "SCHEDULED":
1203 task["status"] = "SUPERSEDED"
1204 return True
1205 else: # task["status"] == "processing"
1206 self.task_lock.release()
1207 return False
1208
1209 def _process_vim_config(self, target_id, db_vim):
1210 """
1211 Process vim config, creating vim configuration files as ca_cert
1212 :param target_id: vim/sdn/wim + id
1213 :param db_vim: Vim dictionary obtained from database
1214 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1215 """
1216 if not db_vim.get("config"):
1217 return
1218
1219 file_name = ""
1220
1221 try:
1222 if db_vim["config"].get("ca_cert_content"):
1223 file_name = "{}:{}".format(target_id, self.worker_index)
1224
1225 try:
1226 mkdir(file_name)
1227 except FileExistsError:
1228 pass
1229
1230 file_name = file_name + "/ca_cert"
1231
1232 with open(file_name, "w") as f:
1233 f.write(db_vim["config"]["ca_cert_content"])
1234 del db_vim["config"]["ca_cert_content"]
1235 db_vim["config"]["ca_cert"] = file_name
1236 except Exception as e:
1237 raise NsWorkerException(
1238 "Error writing to file '{}': {}".format(file_name, e)
1239 )
1240
1241 def _load_plugin(self, name, type="vim"):
1242 # type can be vim or sdn
1243 if "rovim_dummy" not in self.plugins:
1244 self.plugins["rovim_dummy"] = VimDummyConnector
1245
1246 if "rosdn_dummy" not in self.plugins:
1247 self.plugins["rosdn_dummy"] = SdnDummyConnector
1248
1249 if name in self.plugins:
1250 return self.plugins[name]
1251
1252 try:
1253 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1254 self.plugins[name] = ep.load()
1255 except Exception as e:
1256 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1257
1258 if name and name not in self.plugins:
1259 raise NsWorkerException(
1260 "Plugin 'osm_{n}' has not been installed".format(n=name)
1261 )
1262
1263 return self.plugins[name]
1264
1265 def _unload_vim(self, target_id):
1266 """
1267 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1268 :param target_id: Contains type:_id; where type can be 'vim', ...
1269 :return: None.
1270 """
1271 try:
1272 self.db_vims.pop(target_id, None)
1273 self.my_vims.pop(target_id, None)
1274
1275 if target_id in self.vim_targets:
1276 self.vim_targets.remove(target_id)
1277
1278 self.logger.info("Unloaded {}".format(target_id))
1279 rmtree("{}:{}".format(target_id, self.worker_index))
1280 except FileNotFoundError:
1281 pass # this is raised by rmtree if folder does not exist
1282 except Exception as e:
1283 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1284
1285 def _check_vim(self, target_id):
1286 """
1287 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1288 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1289 :return: None.
1290 """
1291 target, _, _id = target_id.partition(":")
1292 now = time.time()
1293 update_dict = {}
1294 unset_dict = {}
1295 op_text = ""
1296 step = ""
1297 loaded = target_id in self.vim_targets
1298 target_database = (
1299 "vim_accounts"
1300 if target == "vim"
1301 else "wim_accounts"
1302 if target == "wim"
1303 else "sdns"
1304 )
1305
1306 try:
1307 step = "Getting {} from db".format(target_id)
1308 db_vim = self.db.get_one(target_database, {"_id": _id})
1309
1310 for op_index, operation in enumerate(
1311 db_vim["_admin"].get("operations", ())
1312 ):
1313 if operation["operationState"] != "PROCESSING":
1314 continue
1315
1316 locked_at = operation.get("locked_at")
1317
1318 if locked_at is not None and locked_at >= now - self.task_locked_time:
1319 # some other thread is doing this operation
1320 return
1321
1322 # lock
1323 op_text = "_admin.operations.{}.".format(op_index)
1324
1325 if not self.db.set_one(
1326 target_database,
1327 q_filter={
1328 "_id": _id,
1329 op_text + "operationState": "PROCESSING",
1330 op_text + "locked_at": locked_at,
1331 },
1332 update_dict={
1333 op_text + "locked_at": now,
1334 "admin.current_operation": op_index,
1335 },
1336 fail_on_empty=False,
1337 ):
1338 return
1339
1340 unset_dict[op_text + "locked_at"] = None
1341 unset_dict["current_operation"] = None
1342 step = "Loading " + target_id
1343 error_text = self._load_vim(target_id)
1344
1345 if not error_text:
1346 step = "Checking connectivity"
1347
1348 if target == "vim":
1349 self.my_vims[target_id].check_vim_connectivity()
1350 else:
1351 self.my_vims[target_id].check_credentials()
1352
1353 update_dict["_admin.operationalState"] = "ENABLED"
1354 update_dict["_admin.detailed-status"] = ""
1355 unset_dict[op_text + "detailed-status"] = None
1356 update_dict[op_text + "operationState"] = "COMPLETED"
1357
1358 return
1359
1360 except Exception as e:
1361 error_text = "{}: {}".format(step, e)
1362 self.logger.error("{} for {}: {}".format(step, target_id, e))
1363
1364 finally:
1365 if update_dict or unset_dict:
1366 if error_text:
1367 update_dict[op_text + "operationState"] = "FAILED"
1368 update_dict[op_text + "detailed-status"] = error_text
1369 unset_dict.pop(op_text + "detailed-status", None)
1370 update_dict["_admin.operationalState"] = "ERROR"
1371 update_dict["_admin.detailed-status"] = error_text
1372
1373 if op_text:
1374 update_dict[op_text + "statusEnteredTime"] = now
1375
1376 self.db.set_one(
1377 target_database,
1378 q_filter={"_id": _id},
1379 update_dict=update_dict,
1380 unset=unset_dict,
1381 fail_on_empty=False,
1382 )
1383
1384 if not loaded:
1385 self._unload_vim(target_id)
1386
1387 def _reload_vim(self, target_id):
1388 if target_id in self.vim_targets:
1389 self._load_vim(target_id)
1390 else:
1391 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1392 # just remove it to force load again next time it is needed
1393 self.db_vims.pop(target_id, None)
1394
1395 def _load_vim(self, target_id):
1396 """
1397 Load or reload a vim_account, sdn_controller or wim_account.
1398 Read content from database, load the plugin if not loaded.
1399 In case of error loading the plugin, it load a failing VIM_connector
1400 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1401 :param target_id: Contains type:_id; where type can be 'vim', ...
1402 :return: None if ok, descriptive text if error
1403 """
1404 target, _, _id = target_id.partition(":")
1405 target_database = (
1406 "vim_accounts"
1407 if target == "vim"
1408 else "wim_accounts"
1409 if target == "wim"
1410 else "sdns"
1411 )
1412 plugin_name = ""
1413 vim = None
1414
1415 try:
1416 step = "Getting {}={} from db".format(target, _id)
1417 # TODO process for wim, sdnc, ...
1418 vim = self.db.get_one(target_database, {"_id": _id})
1419
1420 # if deep_get(vim, "config", "sdn-controller"):
1421 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1422 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1423
1424 step = "Decrypting password"
1425 schema_version = vim.get("schema_version")
1426 self.db.encrypt_decrypt_fields(
1427 vim,
1428 "decrypt",
1429 fields=("password", "secret"),
1430 schema_version=schema_version,
1431 salt=_id,
1432 )
1433 self._process_vim_config(target_id, vim)
1434
1435 if target == "vim":
1436 plugin_name = "rovim_" + vim["vim_type"]
1437 step = "Loading plugin '{}'".format(plugin_name)
1438 vim_module_conn = self._load_plugin(plugin_name)
1439 step = "Loading {}'".format(target_id)
1440 self.my_vims[target_id] = vim_module_conn(
1441 uuid=vim["_id"],
1442 name=vim["name"],
1443 tenant_id=vim.get("vim_tenant_id"),
1444 tenant_name=vim.get("vim_tenant_name"),
1445 url=vim["vim_url"],
1446 url_admin=None,
1447 user=vim["vim_user"],
1448 passwd=vim["vim_password"],
1449 config=vim.get("config") or {},
1450 persistent_info={},
1451 )
1452 else: # sdn
1453 plugin_name = "rosdn_" + vim["type"]
1454 step = "Loading plugin '{}'".format(plugin_name)
1455 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1456 step = "Loading {}'".format(target_id)
1457 wim = deepcopy(vim)
1458 wim_config = wim.pop("config", {}) or {}
1459 wim["uuid"] = wim["_id"]
1460 wim["wim_url"] = wim["url"]
1461
1462 if wim.get("dpid"):
1463 wim_config["dpid"] = wim.pop("dpid")
1464
1465 if wim.get("switch_id"):
1466 wim_config["switch_id"] = wim.pop("switch_id")
1467
1468 # wim, wim_account, config
1469 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1470 self.db_vims[target_id] = vim
1471 self.error_status = None
1472
1473 self.logger.info(
1474 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1475 )
1476 except Exception as e:
1477 self.logger.error(
1478 "Cannot load {} plugin={}: {} {}".format(
1479 target_id, plugin_name, step, e
1480 )
1481 )
1482
1483 self.db_vims[target_id] = vim or {}
1484 self.db_vims[target_id] = FailingConnector(str(e))
1485 error_status = "{} Error: {}".format(step, e)
1486
1487 return error_status
1488 finally:
1489 if target_id not in self.vim_targets:
1490 self.vim_targets.append(target_id)
1491
1492 def _get_db_task(self):
1493 """
1494 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1495 :return: None
1496 """
1497 now = time.time()
1498
1499 if not self.time_last_task_processed:
1500 self.time_last_task_processed = now
1501
1502 try:
1503 while True:
1504 locked = self.db.set_one(
1505 "ro_tasks",
1506 q_filter={
1507 "target_id": self.vim_targets,
1508 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1509 "locked_at.lt": now - self.task_locked_time,
1510 "to_check_at.lt": self.time_last_task_processed,
1511 },
1512 update_dict={"locked_by": self.my_id, "locked_at": now},
1513 fail_on_empty=False,
1514 )
1515
1516 if locked:
1517 # read and return
1518 ro_task = self.db.get_one(
1519 "ro_tasks",
1520 q_filter={
1521 "target_id": self.vim_targets,
1522 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1523 "locked_at": now,
1524 },
1525 )
1526 return ro_task
1527
1528 if self.time_last_task_processed == now:
1529 self.time_last_task_processed = None
1530 return None
1531 else:
1532 self.time_last_task_processed = now
1533 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1534
1535 except DbException as e:
1536 self.logger.error("Database exception at _get_db_task: {}".format(e))
1537 except Exception as e:
1538 self.logger.critical(
1539 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1540 )
1541
1542 return None
1543
1544 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1545 """
1546 Determine if this task need to be done or superseded
1547 :return: None
1548 """
1549 my_task = ro_task["tasks"][task_index]
1550 task_id = my_task["task_id"]
1551 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1552 "created_items", False
1553 )
1554
1555 if my_task["status"] == "FAILED":
1556 return None, None # TODO need to be retry??
1557
1558 try:
1559 for index, task in enumerate(ro_task["tasks"]):
1560 if index == task_index or not task:
1561 continue # own task
1562
1563 if (
1564 my_task["target_record"] == task["target_record"]
1565 and task["action"] == "CREATE"
1566 ):
1567 # set to finished
1568 db_update["tasks.{}.status".format(index)] = task[
1569 "status"
1570 ] = "FINISHED"
1571 elif task["action"] == "CREATE" and task["status"] not in (
1572 "FINISHED",
1573 "SUPERSEDED",
1574 ):
1575 needed_delete = False
1576
1577 if needed_delete:
1578 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1579 else:
1580 return "SUPERSEDED", None
1581 except Exception as e:
1582 if not isinstance(e, NsWorkerException):
1583 self.logger.critical(
1584 "Unexpected exception at _delete_task task={}: {}".format(
1585 task_id, e
1586 ),
1587 exc_info=True,
1588 )
1589
1590 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1591
1592 def _create_task(self, ro_task, task_index, task_depends, db_update):
1593 """
1594 Determine if this task need to create something at VIM
1595 :return: None
1596 """
1597 my_task = ro_task["tasks"][task_index]
1598 task_id = my_task["task_id"]
1599 task_status = None
1600
1601 if my_task["status"] == "FAILED":
1602 return None, None # TODO need to be retry??
1603 elif my_task["status"] == "SCHEDULED":
1604 # check if already created by another task
1605 for index, task in enumerate(ro_task["tasks"]):
1606 if index == task_index or not task:
1607 continue # own task
1608
1609 if task["action"] == "CREATE" and task["status"] not in (
1610 "SCHEDULED",
1611 "FINISHED",
1612 "SUPERSEDED",
1613 ):
1614 return task["status"], "COPY_VIM_INFO"
1615
1616 try:
1617 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1618 ro_task, task_index, task_depends
1619 )
1620 # TODO update other CREATE tasks
1621 except Exception as e:
1622 if not isinstance(e, NsWorkerException):
1623 self.logger.error(
1624 "Error executing task={}: {}".format(task_id, e), exc_info=True
1625 )
1626
1627 task_status = "FAILED"
1628 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1629 # TODO update ro_vim_item_update
1630
1631 return task_status, ro_vim_item_update
1632 else:
1633 return None, None
1634
1635 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1636 """
1637 Look for dependency task
1638 :param task_id: Can be one of
1639 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1640 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1641 3. task.task_id: "<action_id>:number"
1642 :param ro_task:
1643 :param target_id:
1644 :return: database ro_task plus index of task
1645 """
1646 if (
1647 task_id.startswith("vim:")
1648 or task_id.startswith("sdn:")
1649 or task_id.startswith("wim:")
1650 ):
1651 target_id, _, task_id = task_id.partition(" ")
1652
1653 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1654 ro_task_dependency = self.db.get_one(
1655 "ro_tasks",
1656 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1657 fail_on_empty=False,
1658 )
1659
1660 if ro_task_dependency:
1661 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1662 if task["target_record_id"] == task_id:
1663 return ro_task_dependency, task_index
1664
1665 else:
1666 if ro_task:
1667 for task_index, task in enumerate(ro_task["tasks"]):
1668 if task and task["task_id"] == task_id:
1669 return ro_task, task_index
1670
1671 ro_task_dependency = self.db.get_one(
1672 "ro_tasks",
1673 q_filter={
1674 "tasks.ANYINDEX.task_id": task_id,
1675 "tasks.ANYINDEX.target_record.ne": None,
1676 },
1677 fail_on_empty=False,
1678 )
1679
1680 if ro_task_dependency:
1681 for task_index, task in ro_task_dependency["tasks"]:
1682 if task["task_id"] == task_id:
1683 return ro_task_dependency, task_index
1684 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1685
1686 def _process_pending_tasks(self, ro_task):
1687 ro_task_id = ro_task["_id"]
1688 now = time.time()
1689 # one day
1690 next_check_at = now + (24 * 60 * 60)
1691 db_ro_task_update = {}
1692
1693 def _update_refresh(new_status):
1694 # compute next_refresh
1695 nonlocal task
1696 nonlocal next_check_at
1697 nonlocal db_ro_task_update
1698 nonlocal ro_task
1699
1700 next_refresh = time.time()
1701
1702 if task["item"] in ("image", "flavor"):
1703 next_refresh += self.REFRESH_IMAGE
1704 elif new_status == "BUILD":
1705 next_refresh += self.REFRESH_BUILD
1706 elif new_status == "DONE":
1707 next_refresh += self.REFRESH_ACTIVE
1708 else:
1709 next_refresh += self.REFRESH_ERROR
1710
1711 next_check_at = min(next_check_at, next_refresh)
1712 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1713 ro_task["vim_info"]["refresh_at"] = next_refresh
1714
1715 try:
1716 # 0: get task_status_create
1717 lock_object = None
1718 task_status_create = None
1719 task_create = next(
1720 (
1721 t
1722 for t in ro_task["tasks"]
1723 if t
1724 and t["action"] == "CREATE"
1725 and t["status"] in ("BUILD", "DONE")
1726 ),
1727 None,
1728 )
1729
1730 if task_create:
1731 task_status_create = task_create["status"]
1732
1733 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1734 for task_action in ("DELETE", "CREATE", "EXEC"):
1735 db_vim_update = None
1736 new_status = None
1737
1738 for task_index, task in enumerate(ro_task["tasks"]):
1739 if not task:
1740 continue # task deleted
1741
1742 task_depends = {}
1743 target_update = None
1744
1745 if (
1746 (
1747 task_action in ("DELETE", "EXEC")
1748 and task["status"] not in ("SCHEDULED", "BUILD")
1749 )
1750 or task["action"] != task_action
1751 or (
1752 task_action == "CREATE"
1753 and task["status"] in ("FINISHED", "SUPERSEDED")
1754 )
1755 ):
1756 continue
1757
1758 task_path = "tasks.{}.status".format(task_index)
1759 try:
1760 db_vim_info_update = None
1761
1762 if task["status"] == "SCHEDULED":
1763 # check if tasks that this depends on have been completed
1764 dependency_not_completed = False
1765
1766 for dependency_task_id in task.get("depends_on") or ():
1767 (
1768 dependency_ro_task,
1769 dependency_task_index,
1770 ) = self._get_dependency(
1771 dependency_task_id, target_id=ro_task["target_id"]
1772 )
1773 dependency_task = dependency_ro_task["tasks"][
1774 dependency_task_index
1775 ]
1776
1777 if dependency_task["status"] == "SCHEDULED":
1778 dependency_not_completed = True
1779 next_check_at = min(
1780 next_check_at, dependency_ro_task["to_check_at"]
1781 )
1782 # must allow dependent task to be processed first
1783 # to do this set time after last_task_processed
1784 next_check_at = max(
1785 self.time_last_task_processed, next_check_at
1786 )
1787 break
1788 elif dependency_task["status"] == "FAILED":
1789 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
1790 task["action"],
1791 task["item"],
1792 dependency_task["action"],
1793 dependency_task["item"],
1794 dependency_task_id,
1795 dependency_ro_task["vim_info"].get(
1796 "vim_details"
1797 ),
1798 )
1799 self.logger.error(
1800 "task={} {}".format(task["task_id"], error_text)
1801 )
1802 raise NsWorkerException(error_text)
1803
1804 task_depends[dependency_task_id] = dependency_ro_task[
1805 "vim_info"
1806 ]["vim_id"]
1807 task_depends[
1808 "TASK-{}".format(dependency_task_id)
1809 ] = dependency_ro_task["vim_info"]["vim_id"]
1810
1811 if dependency_not_completed:
1812 # TODO set at vim_info.vim_details that it is waiting
1813 continue
1814
1815 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
1816 # the task of renew this locking. It will update database locket_at periodically
1817 if not lock_object:
1818 lock_object = LockRenew.add_lock_object(
1819 "ro_tasks", ro_task, self
1820 )
1821
1822 if task["action"] == "DELETE":
1823 (new_status, db_vim_info_update,) = self._delete_task(
1824 ro_task, task_index, task_depends, db_ro_task_update
1825 )
1826 new_status = (
1827 "FINISHED" if new_status == "DONE" else new_status
1828 )
1829 # ^with FINISHED instead of DONE it will not be refreshing
1830
1831 if new_status in ("FINISHED", "SUPERSEDED"):
1832 target_update = "DELETE"
1833 elif task["action"] == "EXEC":
1834 (
1835 new_status,
1836 db_vim_info_update,
1837 db_task_update,
1838 ) = self.item2class[task["item"]].exec(
1839 ro_task, task_index, task_depends
1840 )
1841 new_status = (
1842 "FINISHED" if new_status == "DONE" else new_status
1843 )
1844 # ^with FINISHED instead of DONE it will not be refreshing
1845
1846 if db_task_update:
1847 # load into database the modified db_task_update "retries" and "next_retry"
1848 if db_task_update.get("retries"):
1849 db_ro_task_update[
1850 "tasks.{}.retries".format(task_index)
1851 ] = db_task_update["retries"]
1852
1853 next_check_at = time.time() + db_task_update.get(
1854 "next_retry", 60
1855 )
1856 target_update = None
1857 elif task["action"] == "CREATE":
1858 if task["status"] == "SCHEDULED":
1859 if task_status_create:
1860 new_status = task_status_create
1861 target_update = "COPY_VIM_INFO"
1862 else:
1863 new_status, db_vim_info_update = self.item2class[
1864 task["item"]
1865 ].new(ro_task, task_index, task_depends)
1866 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
1867 _update_refresh(new_status)
1868 else:
1869 if (
1870 ro_task["vim_info"]["refresh_at"]
1871 and now > ro_task["vim_info"]["refresh_at"]
1872 ):
1873 new_status, db_vim_info_update = self.item2class[
1874 task["item"]
1875 ].refresh(ro_task)
1876 _update_refresh(new_status)
1877
1878 except Exception as e:
1879 new_status = "FAILED"
1880 db_vim_info_update = {
1881 "vim_status": "VIM_ERROR",
1882 "vim_details": str(e),
1883 }
1884
1885 if not isinstance(
1886 e, (NsWorkerException, vimconn.VimConnException)
1887 ):
1888 self.logger.error(
1889 "Unexpected exception at _delete_task task={}: {}".format(
1890 task["task_id"], e
1891 ),
1892 exc_info=True,
1893 )
1894
1895 try:
1896 if db_vim_info_update:
1897 db_vim_update = db_vim_info_update.copy()
1898 db_ro_task_update.update(
1899 {
1900 "vim_info." + k: v
1901 for k, v in db_vim_info_update.items()
1902 }
1903 )
1904 ro_task["vim_info"].update(db_vim_info_update)
1905
1906 if new_status:
1907 if task_action == "CREATE":
1908 task_status_create = new_status
1909 db_ro_task_update[task_path] = new_status
1910
1911 if target_update or db_vim_update:
1912 if target_update == "DELETE":
1913 self._update_target(task, None)
1914 elif target_update == "COPY_VIM_INFO":
1915 self._update_target(task, ro_task["vim_info"])
1916 else:
1917 self._update_target(task, db_vim_update)
1918
1919 except Exception as e:
1920 if (
1921 isinstance(e, DbException)
1922 and e.http_code == HTTPStatus.NOT_FOUND
1923 ):
1924 # if the vnfrs or nsrs has been removed from database, this task must be removed
1925 self.logger.debug(
1926 "marking to delete task={}".format(task["task_id"])
1927 )
1928 self.tasks_to_delete.append(task)
1929 else:
1930 self.logger.error(
1931 "Unexpected exception at _update_target task={}: {}".format(
1932 task["task_id"], e
1933 ),
1934 exc_info=True,
1935 )
1936
1937 locked_at = ro_task["locked_at"]
1938
1939 if lock_object:
1940 locked_at = [
1941 lock_object["locked_at"],
1942 lock_object["locked_at"] + self.task_locked_time,
1943 ]
1944 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
1945 # contain exactly locked_at + self.task_locked_time
1946 LockRenew.remove_lock_object(lock_object)
1947
1948 q_filter = {
1949 "_id": ro_task["_id"],
1950 "to_check_at": ro_task["to_check_at"],
1951 "locked_at": locked_at,
1952 }
1953 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
1954 # outside this task (by ro_nbi) do not update it
1955 db_ro_task_update["locked_by"] = None
1956 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
1957 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
1958 db_ro_task_update["modified_at"] = now
1959 db_ro_task_update["to_check_at"] = next_check_at
1960
1961 if not self.db.set_one(
1962 "ro_tasks",
1963 update_dict=db_ro_task_update,
1964 q_filter=q_filter,
1965 fail_on_empty=False,
1966 ):
1967 del db_ro_task_update["to_check_at"]
1968 del q_filter["to_check_at"]
1969 self.db.set_one(
1970 "ro_tasks",
1971 q_filter=q_filter,
1972 update_dict=db_ro_task_update,
1973 fail_on_empty=True,
1974 )
1975 except DbException as e:
1976 self.logger.error(
1977 "ro_task={} Error updating database {}".format(ro_task_id, e)
1978 )
1979 except Exception as e:
1980 self.logger.error(
1981 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
1982 )
1983
1984 def _update_target(self, task, ro_vim_item_update):
1985 table, _, temp = task["target_record"].partition(":")
1986 _id, _, path_vim_status = temp.partition(":")
1987 path_item = path_vim_status[: path_vim_status.rfind(".")]
1988 path_item = path_item[: path_item.rfind(".")]
1989 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
1990 # path_item: dot separated list targeting record information, e.g. "vdur.10"
1991
1992 if ro_vim_item_update:
1993 update_dict = {
1994 path_vim_status + "." + k: v
1995 for k, v in ro_vim_item_update.items()
1996 if k
1997 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
1998 }
1999
2000 if path_vim_status.startswith("vdur."):
2001 # for backward compatibility, add vdur.name apart from vdur.vim_name
2002 if ro_vim_item_update.get("vim_name"):
2003 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2004
2005 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2006 if ro_vim_item_update.get("vim_id"):
2007 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2008
2009 # update general status
2010 if ro_vim_item_update.get("vim_status"):
2011 update_dict[path_item + ".status"] = ro_vim_item_update[
2012 "vim_status"
2013 ]
2014
2015 if ro_vim_item_update.get("interfaces"):
2016 path_interfaces = path_item + ".interfaces"
2017
2018 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2019 if iface:
2020 update_dict.update(
2021 {
2022 path_interfaces + ".{}.".format(i) + k: v
2023 for k, v in iface.items()
2024 if k in ("vlan", "compute_node", "pci")
2025 }
2026 )
2027
2028 # put ip_address and mac_address with ip-address and mac-address
2029 if iface.get("ip_address"):
2030 update_dict[
2031 path_interfaces + ".{}.".format(i) + "ip-address"
2032 ] = iface["ip_address"]
2033
2034 if iface.get("mac_address"):
2035 update_dict[
2036 path_interfaces + ".{}.".format(i) + "mac-address"
2037 ] = iface["mac_address"]
2038
2039 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2040 update_dict["ip-address"] = iface.get("ip_address").split(
2041 ";"
2042 )[0]
2043
2044 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2045 update_dict[path_item + ".ip-address"] = iface.get(
2046 "ip_address"
2047 ).split(";")[0]
2048
2049 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2050 else:
2051 update_dict = {path_item + ".status": "DELETED"}
2052 self.db.set_one(
2053 table,
2054 q_filter={"_id": _id},
2055 update_dict=update_dict,
2056 unset={path_vim_status: None},
2057 )
2058
2059 def _process_delete_db_tasks(self):
2060 """
2061 Delete task from database because vnfrs or nsrs or both have been deleted
2062 :return: None. Uses and modify self.tasks_to_delete
2063 """
2064 while self.tasks_to_delete:
2065 task = self.tasks_to_delete[0]
2066 vnfrs_deleted = None
2067 nsr_id = task["nsr_id"]
2068
2069 if task["target_record"].startswith("vnfrs:"):
2070 # check if nsrs is present
2071 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2072 vnfrs_deleted = task["target_record"].split(":")[1]
2073
2074 try:
2075 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2076 except Exception as e:
2077 self.logger.error(
2078 "Error deleting task={}: {}".format(task["task_id"], e)
2079 )
2080 self.tasks_to_delete.pop(0)
2081
2082 @staticmethod
2083 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2084 """
2085 Static method because it is called from osm_ng_ro.ns
2086 :param db: instance of database to use
2087 :param nsr_id: affected nsrs id
2088 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2089 :return: None, exception is fails
2090 """
2091 retries = 5
2092 for retry in range(retries):
2093 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2094 now = time.time()
2095 conflict = False
2096
2097 for ro_task in ro_tasks:
2098 db_update = {}
2099 to_delete_ro_task = True
2100
2101 for index, task in enumerate(ro_task["tasks"]):
2102 if not task:
2103 pass
2104 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2105 vnfrs_deleted
2106 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2107 ):
2108 db_update["tasks.{}".format(index)] = None
2109 else:
2110 # used by other nsr, ro_task cannot be deleted
2111 to_delete_ro_task = False
2112
2113 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2114 if to_delete_ro_task:
2115 if not db.del_one(
2116 "ro_tasks",
2117 q_filter={
2118 "_id": ro_task["_id"],
2119 "modified_at": ro_task["modified_at"],
2120 },
2121 fail_on_empty=False,
2122 ):
2123 conflict = True
2124 elif db_update:
2125 db_update["modified_at"] = now
2126 if not db.set_one(
2127 "ro_tasks",
2128 q_filter={
2129 "_id": ro_task["_id"],
2130 "modified_at": ro_task["modified_at"],
2131 },
2132 update_dict=db_update,
2133 fail_on_empty=False,
2134 ):
2135 conflict = True
2136 if not conflict:
2137 return
2138 else:
2139 raise NsWorkerException("Exceeded {} retries".format(retries))
2140
2141 def run(self):
2142 # load database
2143 self.logger.info("Starting")
2144 while True:
2145 # step 1: get commands from queue
2146 try:
2147 if self.vim_targets:
2148 task = self.task_queue.get(block=False)
2149 else:
2150 if not self.idle:
2151 self.logger.debug("enters in idle state")
2152 self.idle = True
2153 task = self.task_queue.get(block=True)
2154 self.idle = False
2155
2156 if task[0] == "terminate":
2157 break
2158 elif task[0] == "load_vim":
2159 self.logger.info("order to load vim {}".format(task[1]))
2160 self._load_vim(task[1])
2161 elif task[0] == "unload_vim":
2162 self.logger.info("order to unload vim {}".format(task[1]))
2163 self._unload_vim(task[1])
2164 elif task[0] == "reload_vim":
2165 self._reload_vim(task[1])
2166 elif task[0] == "check_vim":
2167 self.logger.info("order to check vim {}".format(task[1]))
2168 self._check_vim(task[1])
2169 continue
2170 except Exception as e:
2171 if isinstance(e, queue.Empty):
2172 pass
2173 else:
2174 self.logger.critical(
2175 "Error processing task: {}".format(e), exc_info=True
2176 )
2177
2178 # step 2: process pending_tasks, delete not needed tasks
2179 try:
2180 if self.tasks_to_delete:
2181 self._process_delete_db_tasks()
2182 busy = False
2183 ro_task = self._get_db_task()
2184 if ro_task:
2185 self._process_pending_tasks(ro_task)
2186 busy = True
2187 if not busy:
2188 time.sleep(5)
2189 except Exception as e:
2190 self.logger.critical(
2191 "Unexpected exception at run: " + str(e), exc_info=True
2192 )
2193
2194 self.logger.info("Finishing")