1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 Several vim_wim_actions can refer to the same element at VIM (flavor, network, ...). This is somethng to avoid if RO
28 is migrated to a non-relational database as mongo db. Each vim_wim_actions reference a different instance_Xxxxx
29 In this case "related" colunm contains the same value, to know they refer to the same vim. In case of deletion, it
30 there is related tasks using this element, it is not deleted, The vim_info needed to delete is transfered to other task
32 The task content is (M: stored at memory, D: stored at database):
33 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
34 MD task_index: index number of the task. This together with the previous forms a unique key identifier
35 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
36 MD vim_id: id of the vm,net,etc at VIM
37 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
38 MD item_id: uuid of the referenced entry in the previous table
39 MD action: CREATE, DELETE, FIND
40 MD status: SCHEDULED: action need to be done
42 DONE: Done and it must be polled to VIM periodically to see status. ONLY for action=CREATE or FIND
43 FAILED: It cannot be created/found/deleted
44 FINISHED: similar to DONE, but no refresh is needed anymore. Task is maintained at database but
45 it is never processed by any thread
46 SUPERSEDED: similar to FINSISHED, but nothing has been done to completed the task.
47 MD extra: text with yaml format at database, dict at memory with:
48 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken
49 from other related tasks
50 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains
52 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends
54 can contain an int (single index on the same instance-action) or str (compete action ID)
55 sdn_net_id: used for net.
56 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
57 iface_id: uuid of intance_interfaces
61 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
62 created: False if the VIM element is not created by other actions, and it should not be deleted
63 vim_status: VIM status of the element. Stored also at database in the instance_XXX
64 vim_info: Detailed information of a vm/net from the VIM. Stored at database in the instance_XXX but not at
66 M depends: dict with task_index(from depends_on) to dependency task
67 M params: same as extra[params]
68 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
69 MD created_at: task creation time. The task of creation must be the oldest
70 MD modified_at: next time task need to be processed. For example, for a refresh, it contain next time refresh must
72 MD related: All the tasks over the same VIM element have same "related". Note that other VIMs can contain the
73 same value of related, but this thread only process those task of one VIM. Also related can be the
74 same among several NS os isntance-scenarios
75 MD worker: Used to lock in case of several thread workers.
83 from osm_ro
import vimconn
84 from osm_ro
.wim
.sdnconn
import SdnConnectorError
86 from osm_ro
.db_base
import db_base_Exception
87 from http
import HTTPStatus
88 from copy
import deepcopy
90 __author__
= "Alfonso Tierno, Pablo Montes"
91 __date__
= "$28-Sep-2017 12:07:15$"
94 def is_task_id(task_id
):
95 return task_id
.startswith("TASK-")
98 class VimThreadException(Exception):
102 class VimThreadExceptionNotFound(VimThreadException
):
106 class vim_thread(threading
.Thread
):
107 REFRESH_BUILD
= 5 # 5 seconds
108 REFRESH_ACTIVE
= 60 # 1 minute
110 REFRESH_DELETE
= 3600 * 10
112 def __init__(self
, task_lock
, plugins
, name
=None, wim_account_id
=None, datacenter_tenant_id
=None, db
=None):
116 'name' name of thread
117 'host','user': host ip or name to manage and user
118 'db', 'db_lock': database class and lock to use it in exclusion
120 threading
.Thread
.__init
__(self
)
121 self
.plugins
= plugins
122 self
.plugin_name
= "unknown"
124 self
.sdnconnector
= None
125 self
.sdnconn_config
= None
126 self
.error_status
= None
127 self
.wim_account_id
= wim_account_id
128 self
.datacenter_tenant_id
= datacenter_tenant_id
129 self
.port_mapping
= None
130 if self
.wim_account_id
:
131 self
.target_k
= "wim_account_id"
132 self
.target_v
= self
.wim_account_id
134 self
.target_k
= "datacenter_vim_id"
135 self
.target_v
= self
.datacenter_tenant_id
137 self
.name
= wim_account_id
or str(datacenter_tenant_id
)
140 self
.vim_persistent_info
= {}
141 self
.my_id
= self
.name
[:64]
143 self
.logger
= logging
.getLogger('openmano.{}.{}'.format("vim" if self
.datacenter_tenant_id
else "sdn",
147 self
.task_lock
= task_lock
148 self
.task_queue
= queue
.Queue(2000)
150 def _proccess_sdn_exception(self
, exc
):
151 if isinstance(exc
, SdnConnectorError
):
154 self
.logger
.error("plugin={} throws a non SdnConnectorError exception {}".format(self
.plugin_name
, exc
),
156 raise SdnConnectorError(str(exc
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
.value
) from exc
158 def _proccess_vim_exception(self
, exc
):
159 if isinstance(exc
, vimconn
.vimconnException
):
162 self
.logger
.error("plugin={} throws a non vimconnException exception {}".format(self
.plugin_name
, exc
),
164 raise vimconn
.vimconnException(str(exc
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
.value
) from exc
166 def get_vim_sdn_connector(self
):
167 if self
.datacenter_tenant_id
:
169 from_
= "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
170 select_
= ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
171 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
172 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
173 'user', 'passwd', 'dt.config as dt_config')
174 where_
= {"dt.uuid": self
.datacenter_tenant_id
}
175 vims
= self
.db
.get_rows(FROM
=from_
, SELECT
=select_
, WHERE
=where_
)
179 vim_config
.update(yaml
.load(vim
["config"], Loader
=yaml
.Loader
))
181 vim_config
.update(yaml
.load(vim
["dt_config"], Loader
=yaml
.Loader
))
182 vim_config
['datacenter_tenant_id'] = vim
.get('datacenter_tenant_id')
183 vim_config
['datacenter_id'] = vim
.get('datacenter_id')
186 # vim_port_mappings = self.ovim.get_of_port_mappings(
187 # db_filter={"datacenter_id": vim_config['datacenter_id']})
188 # vim_config["wim_external_ports"] = [x for x in vim_port_mappings
189 # if x["service_mapping_info"].get("wim")]
190 self
.plugin_name
= "rovim_" + vim
["type"]
191 self
.vim
= self
.plugins
[self
.plugin_name
].vimconnector(
192 uuid
=vim
['datacenter_id'], name
=vim
['datacenter_name'],
193 tenant_id
=vim
['vim_tenant_id'], tenant_name
=vim
['vim_tenant_name'],
194 url
=vim
['vim_url'], url_admin
=vim
['vim_url_admin'],
195 user
=vim
['user'], passwd
=vim
['passwd'],
196 config
=vim_config
, persistent_info
=self
.vim_persistent_info
198 self
.error_status
= None
199 self
.logger
.info("Vim Connector loaded for vim_account={}, plugin={}".format(
200 self
.datacenter_tenant_id
, self
.plugin_name
))
201 except Exception as e
:
202 self
.logger
.error("Cannot load vimconnector for vim_account={} plugin={}: {}".format(
203 self
.datacenter_tenant_id
, self
.plugin_name
, e
))
205 self
.error_status
= "Error loading vimconnector: {}".format(e
)
208 wim_account
= self
.db
.get_rows(FROM
="wim_accounts", WHERE
={"uuid": self
.wim_account_id
})[0]
209 wim
= self
.db
.get_rows(FROM
="wims", WHERE
={"uuid": wim_account
["wim_id"]})[0]
211 self
.sdnconn_config
= yaml
.load(wim
["config"], Loader
=yaml
.Loader
)
213 self
.sdnconn_config
= {}
214 if wim_account
["config"]:
215 self
.sdnconn_config
.update(yaml
.load(wim_account
["config"], Loader
=yaml
.Loader
))
216 self
.port_mappings
= self
.db
.get_rows(FROM
="wim_port_mappings", WHERE
={"wim_id": wim_account
["wim_id"]})
217 if self
.port_mappings
:
218 self
.sdnconn_config
["service_endpoint_mapping"] = self
.port_mappings
219 self
.plugin_name
= "rosdn_" + wim
["type"]
220 self
.sdnconnector
= self
.plugins
[self
.plugin_name
](
221 wim
, wim_account
, config
=self
.sdnconn_config
)
222 self
.error_status
= None
223 self
.logger
.info("Sdn Connector loaded for wim_account={}, plugin={}".format(
224 self
.wim_account_id
, self
.plugin_name
))
225 except Exception as e
:
226 self
.logger
.error("Cannot load sdn connector for wim_account={}, plugin={}: {}".format(
227 self
.wim_account_id
, self
.plugin_name
, e
))
228 self
.sdnconnector
= None
229 self
.error_status
= "Error loading sdn connector: {}".format(e
)
231 def _get_db_task(self
):
233 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
241 # get 20 (database_limit) entries each time
242 vim_actions
= self
.db
.get_rows(FROM
="vim_wim_actions",
243 WHERE
={self
.target_k
: self
.target_v
,
244 "status": ['SCHEDULED', 'BUILD', 'DONE'],
245 "worker": [None, self
.my_id
], "modified_at<=": now
247 ORDER_BY
=("modified_at", "created_at",),
248 LIMIT
=database_limit
)
251 # if vim_actions[0]["modified_at"] > now:
252 # return int(vim_actions[0] - now)
253 for task
in vim_actions
:
255 if task_related
== task
["related"]:
256 continue # ignore if a locking has already tried for these task set
257 task_related
= task
["related"]
259 self
.db
.update_rows("vim_wim_actions", UPDATE
={"worker": self
.my_id
}, modified_time
=0,
260 WHERE
={self
.target_k
: self
.target_v
,
261 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
262 "worker": [None, self
.my_id
],
263 "related": task_related
,
264 "item": task
["item"],
266 # ... and read all related and check if locked
267 related_tasks
= self
.db
.get_rows(FROM
="vim_wim_actions",
268 WHERE
={self
.target_k
: self
.target_v
,
269 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
270 "related": task_related
,
271 "item": task
["item"],
273 ORDER_BY
=("created_at",))
274 # check that all related tasks have been locked. If not release and try again. It can happen
275 # for race conditions if a new related task has been inserted by nfvo in the process
276 some_tasks_locked
= False
277 some_tasks_not_locked
= False
279 for relate_task
in related_tasks
:
280 if relate_task
["worker"] != self
.my_id
:
281 some_tasks_not_locked
= True
283 some_tasks_locked
= True
284 if not creation_task
and relate_task
["action"] in ("CREATE", "FIND"):
285 creation_task
= relate_task
286 if some_tasks_not_locked
:
287 if some_tasks_locked
: # unlock
288 self
.db
.update_rows("vim_wim_actions", UPDATE
={"worker": None}, modified_time
=0,
289 WHERE
={self
.target_k
: self
.target_v
,
290 "worker": self
.my_id
,
291 "related": task_related
,
292 "item": task
["item"],
296 # task of creation must be the first in the list of related_task
297 assert(related_tasks
[0]["action"] in ("CREATE", "FIND"))
299 task
["params"] = None
301 extra
= yaml
.load(task
["extra"], Loader
=yaml
.Loader
)
304 task
["extra"] = extra
305 if extra
.get("depends_on"):
307 if extra
.get("params"):
308 task
["params"] = deepcopy(extra
["params"])
309 return task
, related_tasks
310 except Exception as e
:
311 self
.logger
.critical("Unexpected exception at _get_db_task: " + str(e
), exc_info
=True)
314 def _delete_task(self
, task
):
316 Determine if this task need to be done or superseded
320 def copy_extra_created(copy_to
, copy_from
):
321 copy_to
["created"] = copy_from
["created"]
322 if copy_from
.get("sdn_net_id"):
323 copy_to
["sdn_net_id"] = copy_from
["sdn_net_id"]
324 if copy_from
.get("interfaces"):
325 copy_to
["interfaces"] = copy_from
["interfaces"]
326 if copy_from
.get("created_items"):
327 if not copy_to
.get("created_items"):
328 copy_to
["created_items"] = {}
329 copy_to
["created_items"].update(copy_from
["created_items"])
332 dependency_task
= None
333 deletion_needed
= False
334 if task
["status"] == "FAILED":
335 return # TODO need to be retry??
337 # get all related tasks
338 related_tasks
= self
.db
.get_rows(FROM
="vim_wim_actions",
339 WHERE
={self
.target_k
: self
.target_v
,
340 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
341 "action": ["FIND", "CREATE"],
342 "related": task
["related"],
344 ORDER_BY
=("created_at",),
346 for related_task
in related_tasks
:
347 if related_task
["item"] == task
["item"] and related_task
["item_id"] == task
["item_id"]:
348 task_create
= related_task
350 if related_task
["extra"]:
351 extra_created
= yaml
.load(related_task
["extra"], Loader
=yaml
.Loader
)
352 if extra_created
.get("created"):
353 deletion_needed
= True
354 related_task
["extra"] = extra_created
355 elif not dependency_task
:
356 dependency_task
= related_task
357 if task_create
and dependency_task
:
360 # mark task_create as FINISHED
361 self
.db
.update_rows("vim_wim_actions", UPDATE
={"status": "FINISHED"},
362 WHERE
={self
.target_k
: self
.target_v
,
363 "instance_action_id": task_create
["instance_action_id"],
364 "task_index": task_create
["task_index"]
366 if not deletion_needed
:
368 elif dependency_task
:
369 # move create information from task_create to relate_task
370 extra_new_created
= yaml
.load(dependency_task
["extra"], Loader
=yaml
.Loader
) or {}
371 extra_new_created
["created"] = extra_created
["created"]
372 copy_extra_created(copy_to
=extra_new_created
, copy_from
=extra_created
)
374 self
.db
.update_rows("vim_wim_actions",
375 UPDATE
={"extra": yaml
.safe_dump(extra_new_created
, default_flow_style
=True,
377 "vim_id": task_create
.get("vim_id")},
378 WHERE
={self
.target_k
: self
.target_v
,
379 "instance_action_id": dependency_task
["instance_action_id"],
380 "task_index": dependency_task
["task_index"]
384 task
["vim_id"] = task_create
["vim_id"]
385 copy_extra_created(copy_to
=task
["extra"], copy_from
=task_create
["extra"])
388 except Exception as e
:
389 self
.logger
.critical("Unexpected exception at _delete_task: " + str(e
), exc_info
=True)
391 def _refres_vm(self
, task
):
392 """Call VIM to get VMs status"""
393 database_update
= None
395 vim_id
= task
["vim_id"]
396 vm_to_refresh_list
= [vim_id
]
398 vim_dict
= self
.vim
.refresh_vms_status(vm_to_refresh_list
)
399 vim_info
= vim_dict
[vim_id
]
400 except vimconn
.vimconnException
as e
:
401 # Mark all tasks at VIM_ERROR status
402 self
.logger
.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e
))
403 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
405 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
406 self
.logger
.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id
, task
["vim_id"], vim_info
))
408 # check and update interfaces
409 task_warning_msg
= ""
410 for interface
in vim_info
.get("interfaces", ()):
411 vim_interface_id
= interface
["vim_interface_id"]
412 if vim_interface_id
not in task
["extra"]["interfaces"]:
413 self
.logger
.critical("task={} get-VM: Interface not found {} on task info {}".format(
414 task_id
, vim_interface_id
, task
["extra"]["interfaces"]), exc_info
=True)
416 task_interface
= task
["extra"]["interfaces"][vim_interface_id
]
417 task_vim_interface
= task_interface
.get("vim_info")
418 if task_vim_interface
!= interface
:
420 # if task_interface.get("sdn_port_id"):
422 # self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
423 # task_interface["sdn_port_id"] = None
424 # except ovimException as e:
425 # error_text = "ovimException deleting external_port={}: {}".format(
426 # task_interface["sdn_port_id"], e)
427 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
428 # task_warning_msg += error_text
429 # # TODO Set error_msg at instance_nets instead of instance VMs
432 # sdn_net_id = task_interface.get("sdn_net_id")
433 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
434 # sdn_port_name = sdn_net_id + "." + task["vim_id"]
435 # sdn_port_name = sdn_port_name[:63]
437 # sdn_port_id = self.ovim.new_external_port(
438 # {"compute_node": interface["compute_node"],
439 # "pci": interface["pci"],
440 # "vlan": interface.get("vlan"),
441 # "net_id": sdn_net_id,
442 # "region": self.vim["config"]["datacenter_id"],
443 # "name": sdn_port_name,
444 # "mac": interface.get("mac_address")})
445 # task_interface["sdn_port_id"] = sdn_port_id
446 # except (ovimException, Exception) as e:
447 # error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
448 # format(interface["compute_node"], interface["pci"], interface.get("vlan"), e)
449 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
450 # task_warning_msg += error_text
451 # # TODO Set error_msg at instance_nets instead of instance VMs
453 self
.db
.update_rows('instance_interfaces',
454 UPDATE
={"mac_address": interface
.get("mac_address"),
455 "ip_address": interface
.get("ip_address"),
456 "vim_interface_id": interface
.get("vim_interface_id"),
457 "vim_info": interface
.get("vim_info"),
458 "sdn_port_id": task_interface
.get("sdn_port_id"),
459 "compute_node": interface
.get("compute_node"),
460 "pci": interface
.get("pci"),
461 "vlan": interface
.get("vlan")},
462 WHERE
={'uuid': task_interface
["iface_id"]})
463 task_interface
["vim_info"] = interface
464 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
465 # # TODO Send message to task SDN to update
467 # check and update task and instance_vms database
468 vim_info_error_msg
= None
469 if vim_info
.get("error_msg"):
470 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info
["error_msg"] + task_warning_msg
)
471 elif task_warning_msg
:
472 vim_info_error_msg
= self
._format
_vim
_error
_msg
(task_warning_msg
)
473 task_vim_info
= task
["extra"].get("vim_info")
474 task_error_msg
= task
.get("error_msg")
475 task_vim_status
= task
["extra"].get("vim_status")
476 if task_vim_status
!= vim_info
["status"] or task_error_msg
!= vim_info_error_msg
or \
477 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
478 database_update
= {"status": vim_info
["status"], "error_msg": vim_info_error_msg
}
479 if vim_info
.get("vim_info"):
480 database_update
["vim_info"] = vim_info
["vim_info"]
482 task
["extra"]["vim_status"] = vim_info
["status"]
483 task
["error_msg"] = vim_info_error_msg
484 if vim_info
.get("vim_info"):
485 task
["extra"]["vim_info"] = vim_info
["vim_info"]
487 return database_update
489 def _refres_net(self
, task
):
490 """Call VIM to get network status"""
491 database_update
= None
493 vim_id
= task
["vim_id"]
494 net_to_refresh_list
= [vim_id
]
496 vim_dict
= self
.vim
.refresh_nets_status(net_to_refresh_list
)
497 vim_info
= vim_dict
[vim_id
]
498 except vimconn
.vimconnException
as e
:
499 # Mark all tasks at VIM_ERROR status
500 self
.logger
.error("task=several get-net: vimconnException when trying to refresh nets " + str(e
))
501 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
503 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
504 self
.logger
.debug("task={} get-net: vim_net_id={} result={}".format(task_id
, task
["vim_id"], vim_info
))
506 task_vim_info
= task
["extra"].get("vim_info")
507 task_vim_status
= task
["extra"].get("vim_status")
508 task_error_msg
= task
.get("error_msg")
509 # task_sdn_net_id = task["extra"].get("sdn_net_id")
511 vim_info_status
= vim_info
["status"]
512 vim_info_error_msg
= vim_info
.get("error_msg")
514 # if task_sdn_net_id:
516 # sdn_net = self.ovim.show_network(task_sdn_net_id)
517 # except (ovimException, Exception) as e:
518 # text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
519 # self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
520 # sdn_net = {"status": "ERROR", "last_error": text_error}
521 # if sdn_net["status"] == "ERROR":
522 # if not vim_info_error_msg:
523 # vim_info_error_msg = str(sdn_net.get("last_error"))
525 # vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
526 # self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
527 # self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
528 # vim_info_status = "ERROR"
529 # elif sdn_net["status"] == "BUILD":
530 # if vim_info_status == "ACTIVE":
531 # vim_info_status = "BUILD"
534 if vim_info_error_msg
:
535 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info_error_msg
)
536 if task_vim_status
!= vim_info_status
or task_error_msg
!= vim_info_error_msg
or \
537 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
538 task
["extra"]["vim_status"] = vim_info_status
539 task
["error_msg"] = vim_info_error_msg
540 if vim_info
.get("vim_info"):
541 task
["extra"]["vim_info"] = vim_info
["vim_info"]
542 database_update
= {"status": vim_info_status
, "error_msg": vim_info_error_msg
}
543 if vim_info
.get("vim_info"):
544 database_update
["vim_info"] = vim_info
["vim_info"]
545 return database_update
547 def _proccess_pending_tasks(self
, task
, related_tasks
):
548 old_task_status
= task
["status"]
549 create_or_find
= False # if as result of processing this task something is created or found
553 if task
["status"] == "SCHEDULED":
554 # check if tasks that this depends on have been completed
555 dependency_not_completed
= False
556 dependency_modified_at
= 0
557 for task_index
in task
["extra"].get("depends_on", ()):
558 task_dependency
= self
._look
_for
_task
(task
["instance_action_id"], task_index
)
559 if not task_dependency
:
560 raise VimThreadException(
561 "Cannot get depending net task trying to get depending task {}.{}".format(
562 task
["instance_action_id"], task_index
))
563 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
564 # database must be look again
565 if task_dependency
["status"] == "SCHEDULED":
566 dependency_not_completed
= True
567 dependency_modified_at
= task_dependency
["modified_at"]
569 elif task_dependency
["status"] == "FAILED":
570 raise VimThreadException(
571 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
572 task
["action"], task
["item"],
573 task
["instance_action_id"], task
["task_index"],
574 task_dependency
["instance_action_id"], task_dependency
["task_index"],
575 task_dependency
["action"], task_dependency
["item"], task_dependency
.get("error_msg")))
577 task
["depends"]["TASK-"+str(task_index
)] = task_dependency
578 task
["depends"]["TASK-{}.{}".format(task
["instance_action_id"], task_index
)] = task_dependency
579 if dependency_not_completed
:
580 # Move this task to the time dependency is going to be modified plus 10 seconds.
581 self
.db
.update_rows("vim_wim_actions", modified_time
=dependency_modified_at
+ 10,
582 UPDATE
={"worker": None},
583 WHERE
={self
.target_k
: self
.target_v
, "worker": self
.my_id
,
584 "related": task
["related"],
586 # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
587 # if task["extra"]["tries"] > 3:
588 # raise VimThreadException(
589 # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
590 # "(task {}.{})".format(task["action"], task["item"],
591 # task["instance_action_id"], task["task_index"],
592 # task_dependency["instance_action_id"], task_dependency["task_index"]
593 # task_dependency["action"], task_dependency["item"]))
596 database_update
= None
597 if task
["action"] == "DELETE":
598 deleted_needed
= self
._delete
_task
(task
)
599 if not deleted_needed
:
600 task
["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
601 task
["error_msg"] = None
603 if task
["status"] == "SUPERSEDED":
604 # not needed to do anything but update database with the new status
605 database_update
= None
606 elif not self
.vim
and not self
.sdnconnector
:
607 task
["status"] = "FAILED"
608 task
["error_msg"] = self
.error_status
609 database_update
= {"status": "VIM_ERROR" if self
.datacenter_tenant_id
else "WIM_ERROR",
610 "error_msg": task
["error_msg"]}
611 elif task
["item_id"] != related_tasks
[0]["item_id"] and task
["action"] in ("FIND", "CREATE"):
612 # Do nothing, just copy values from one to another and update database
613 task
["status"] = related_tasks
[0]["status"]
614 task
["error_msg"] = related_tasks
[0]["error_msg"]
615 task
["vim_id"] = related_tasks
[0]["vim_id"]
616 extra
= yaml
.load(related_tasks
[0]["extra"], Loader
=yaml
.Loader
)
617 task
["extra"]["vim_status"] = extra
.get("vim_status")
618 next_refresh
= related_tasks
[0]["modified_at"] + 0.001
619 database_update
= {"status": task
["extra"].get("vim_status", "VIM_ERROR"),
620 "error_msg": task
["error_msg"]}
621 if task
["item"] == 'instance_vms':
622 database_update
["vim_vm_id"] = task
["vim_id"]
623 elif task
["item"] == 'instance_nets':
624 database_update
["vim_net_id"] = task
["vim_id"]
625 elif task
["item"] == 'instance_vms':
626 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
627 database_update
= self
._refres
_vm
(task
)
628 create_or_find
= True
629 elif task
["action"] == "CREATE":
630 create_or_find
= True
631 database_update
= self
.new_vm(task
)
632 elif task
["action"] == "DELETE":
635 raise vimconn
.vimconnException(self
.name
+ "unknown task action {}".format(task
["action"]))
636 elif task
["item"] == 'instance_nets':
637 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
638 database_update
= self
._refres
_net
(task
)
639 create_or_find
= True
640 elif task
["action"] == "CREATE":
641 create_or_find
= True
642 database_update
= self
.new_net(task
)
643 elif task
["action"] == "DELETE":
645 elif task
["action"] == "FIND":
646 database_update
= self
.get_net(task
)
648 raise vimconn
.vimconnException(self
.name
+ "unknown task action {}".format(task
["action"]))
649 elif task
["item"] == 'instance_wim_nets':
650 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
651 database_update
= self
.new_or_update_sdn_net(task
)
652 create_or_find
= True
653 elif task
["action"] == "CREATE":
654 create_or_find
= True
655 database_update
= self
.new_or_update_sdn_net(task
)
656 elif task
["action"] == "DELETE":
657 self
.del_sdn_net(task
)
658 elif task
["action"] == "FIND":
659 database_update
= self
.get_sdn_net(task
)
661 raise vimconn
.vimconnException(self
.name
+ "unknown task action {}".format(task
["action"]))
662 elif task
["item"] == 'instance_sfis':
663 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
664 database_update
= self
._refres
_sfis
(task
)
665 create_or_find
= True
666 elif task
["action"] == "CREATE":
667 create_or_find
= True
668 database_update
= self
.new_sfi(task
)
669 elif task
["action"] == "DELETE":
672 raise vimconn
.vimconnException(self
.name
+ "unknown task action {}".format(task
["action"]))
673 elif task
["item"] == 'instance_sfs':
674 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
675 database_update
= self
._refres
_sfs
(task
)
676 create_or_find
= True
677 elif task
["action"] == "CREATE":
678 create_or_find
= True
679 database_update
= self
.new_sf(task
)
680 elif task
["action"] == "DELETE":
683 raise vimconn
.vimconnException(self
.name
+ "unknown task action {}".format(task
["action"]))
684 elif task
["item"] == 'instance_classifications':
685 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
686 database_update
= self
._refres
_classifications
(task
)
687 create_or_find
= True
688 elif task
["action"] == "CREATE":
689 create_or_find
= True
690 database_update
= self
.new_classification(task
)
691 elif task
["action"] == "DELETE":
692 self
.del_classification(task
)
694 raise vimconn
.vimconnException(self
.name
+ "unknown task action {}".format(task
["action"]))
695 elif task
["item"] == 'instance_sfps':
696 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
697 database_update
= self
._refres
_sfps
(task
)
698 create_or_find
= True
699 elif task
["action"] == "CREATE":
700 create_or_find
= True
701 database_update
= self
.new_sfp(task
)
702 elif task
["action"] == "DELETE":
705 raise vimconn
.vimconnException(self
.name
+ "unknown task action {}".format(task
["action"]))
707 raise vimconn
.vimconnException(self
.name
+ "unknown task item {}".format(task
["item"]))
709 except VimThreadException
as e
:
710 task
["error_msg"] = str(e
)
711 task
["status"] = "FAILED"
712 database_update
= {"status": "VIM_ERROR", "error_msg": task
["error_msg"]}
713 if task
["item"] == 'instance_vms':
714 database_update
["vim_vm_id"] = None
715 elif task
["item"] == 'instance_nets':
716 database_update
["vim_net_id"] = None
718 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
719 self
.logger
.debug("task={} item={} action={} result={}:'{}' params={}".format(
720 task_id
, task
["item"], task
["action"], task
["status"],
721 task
["vim_id"] if task
["status"] == "DONE" else task
.get("error_msg"), task
["params"]))
724 if task
["status"] == "DONE":
725 next_refresh
= time
.time()
726 if task
["extra"].get("vim_status") == "BUILD":
727 next_refresh
+= self
.REFRESH_BUILD
728 elif task
["extra"].get("vim_status") in ("ERROR", "VIM_ERROR", "WIM_ERROR"):
729 next_refresh
+= self
.REFRESH_ERROR
730 elif task
["extra"].get("vim_status") == "DELETED":
731 next_refresh
+= self
.REFRESH_DELETE
733 next_refresh
+= self
.REFRESH_ACTIVE
734 elif task
["status"] == "FAILED":
735 next_refresh
= time
.time() + self
.REFRESH_DELETE
738 # modify all related task with action FIND/CREATED non SCHEDULED
740 table
="vim_wim_actions", modified_time
=next_refresh
+ 0.001,
741 UPDATE
={"status": task
["status"], "vim_id": task
.get("vim_id"),
742 "error_msg": task
["error_msg"],
745 WHERE
={self
.target_k
: self
.target_v
,
746 "worker": self
.my_id
,
747 "action": ["FIND", "CREATE"],
748 "related": task
["related"],
749 "status<>": "SCHEDULED",
753 table
="vim_wim_actions", modified_time
=next_refresh
,
754 UPDATE
={"status": task
["status"], "vim_id": task
.get("vim_id"),
755 "error_msg": task
["error_msg"],
756 "extra": yaml
.safe_dump(task
["extra"], default_flow_style
=True, width
=256)},
757 WHERE
={"instance_action_id": task
["instance_action_id"], "task_index": task
["task_index"]})
760 table
="vim_wim_actions", modified_time
=0,
761 UPDATE
={"worker": None},
762 WHERE
={self
.target_k
: self
.target_v
,
763 "worker": self
.my_id
,
764 "related": task
["related"],
767 # Update table instance_actions
768 if old_task_status
== "SCHEDULED" and task
["status"] != old_task_status
:
770 table
="instance_actions",
771 UPDATE
={("number_failed" if task
["status"] == "FAILED" else "number_done"): {"INCREMENT": 1}},
772 WHERE
={"uuid": task
["instance_action_id"]})
774 where_filter
= {"related": task
["related"]}
775 if task
["item"] == "instance_nets" and task
["datacenter_vim_id"]:
776 where_filter
["datacenter_tenant_id"] = task
["datacenter_vim_id"]
777 self
.db
.update_rows(table
=task
["item"],
778 UPDATE
=database_update
,
780 except db_base_Exception
as e
:
781 self
.logger
.error("task={} Error updating database {}".format(task_id
, e
), exc_info
=True)
783 def insert_task(self
, task
):
785 self
.task_queue
.put(task
, False)
788 raise vimconn
.vimconnException(self
.name
+ ": timeout inserting a task")
790 def del_task(self
, task
):
792 if task
["status"] == "SCHEDULED":
793 task
["status"] = "SUPERSEDED"
795 else: # task["status"] == "processing"
796 self
.task_lock
.release()
800 self
.logger
.debug("Starting")
802 self
.get_vim_sdn_connector()
803 self
.logger
.debug("Vimconnector loaded")
804 reload_thread
= False
808 while not self
.task_queue
.empty():
809 task
= self
.task_queue
.get()
810 if isinstance(task
, list):
812 elif isinstance(task
, str):
815 elif task
== 'reload':
818 self
.task_queue
.task_done()
822 task
, related_tasks
= self
._get
_db
_task
()
824 self
._proccess
_pending
_tasks
(task
, related_tasks
)
828 except Exception as e
:
829 self
.logger
.critical("Unexpected exception at run: " + str(e
), exc_info
=True)
831 self
.logger
.debug("Finishing")
833 def _look_for_task(self
, instance_action_id
, task_id
):
835 Look for a concrete task at vim_actions database table
836 :param instance_action_id: The instance_action_id
837 :param task_id: Can have several formats:
838 <task index>: integer
839 TASK-<task index> :backward compatibility,
840 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
841 :return: Task dictionary or None if not found
843 if isinstance(task_id
, int):
846 if task_id
.startswith("TASK-"):
847 task_id
= task_id
[5:]
848 ins_action_id
, _
, task_index
= task_id
.rpartition(".")
850 instance_action_id
= ins_action_id
852 tasks
= self
.db
.get_rows(FROM
="vim_wim_actions", WHERE
={"instance_action_id": instance_action_id
,
853 "task_index": task_index
})
857 task
["params"] = None
860 extra
= yaml
.load(task
["extra"], Loader
=yaml
.Loader
)
861 task
["extra"] = extra
862 task
["params"] = extra
.get("params")
868 def _format_vim_error_msg(error_text
, max_length
=1024):
869 if error_text
and len(error_text
) >= max_length
:
870 return error_text
[:max_length
// 2 - 3] + " ... " + error_text
[-max_length
// 2 + 3:]
873 def new_vm(self
, task
):
874 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
876 params
= task
["params"]
877 depends
= task
.get("depends")
880 if "net_id" in net
and is_task_id(net
["net_id"]): # change task_id into network_id
881 network_id
= task
["depends"][net
["net_id"]].get("vim_id")
883 raise VimThreadException(
884 "Cannot create VM because depends on a network not created or found: " +
885 str(depends
[net
["net_id"]]["error_msg"]))
886 net
["net_id"] = network_id
887 params_copy
= deepcopy(params
)
888 vim_vm_id
, created_items
= self
.vim
.new_vminstance(*params_copy
)
890 # fill task_interfaces. Look for snd_net_id at database for each interface
892 for iface
in params_copy
[5]:
893 task_interfaces
[iface
["vim_id"]] = {"iface_id": iface
["uuid"]}
894 result
= self
.db
.get_rows(
895 SELECT
=('sdn_net_id', 'interface_id'),
896 FROM
='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
897 WHERE
={'ii.uuid': iface
["uuid"]})
899 task_interfaces
[iface
["vim_id"]]["sdn_net_id"] = result
[0]['sdn_net_id']
900 task_interfaces
[iface
["vim_id"]]["interface_id"] = result
[0]['interface_id']
902 self
.logger
.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id
,
906 task
["vim_info"] = {}
907 task
["extra"]["interfaces"] = task_interfaces
908 task
["extra"]["created"] = True
909 task
["extra"]["created_items"] = created_items
910 task
["extra"]["vim_status"] = "BUILD"
911 task
["error_msg"] = None
912 task
["status"] = "DONE"
913 task
["vim_id"] = vim_vm_id
914 instance_element_update
= {"status": "BUILD", "vim_vm_id": vim_vm_id
, "error_msg": None}
915 return instance_element_update
917 except (vimconn
.vimconnException
, VimThreadException
) as e
:
918 self
.logger
.error("task={} new-VM: {}".format(task_id
, e
))
919 error_text
= self
._format
_vim
_error
_msg
(str(e
))
920 task
["error_msg"] = error_text
921 task
["status"] = "FAILED"
922 task
["vim_id"] = None
923 instance_element_update
= {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text
}
924 return instance_element_update
926 def del_vm(self
, task
):
927 # task_id = task["instance_action_id"] + "." + str(task["task_index"])
928 vm_vim_id
= task
["vim_id"]
929 # interfaces = task["extra"].get("interfaces", ())
931 # for iface in interfaces.values():
932 # if iface.get("sdn_port_id"):
934 # self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
935 # except ovimException as e:
936 # self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
937 # task_id, iface["sdn_port_id"], e), exc_info=True)
938 # # TODO Set error_msg at instance_nets
940 self
.vim
.delete_vminstance(vm_vim_id
, task
["extra"].get("created_items"))
941 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
942 task
["error_msg"] = None
945 except vimconn
.vimconnException
as e
:
946 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
947 if isinstance(e
, vimconn
.vimconnNotFoundException
):
948 # If not found mark as Done and fill error_msg
949 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
951 task
["status"] = "FAILED"
954 def _get_net_internal(self
, task
, filter_param
):
956 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
957 :param task: task for this find or find-or-create action
958 :param filter_param: parameters to send to the vimconnector
959 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
960 when network is not found or found more than one
962 vim_nets
= self
.vim
.get_network_list(filter_param
)
964 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param
))
965 elif len(vim_nets
) > 1:
966 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param
))
967 vim_net_id
= vim_nets
[0]["id"]
969 # Discover if this network is managed by a sdn controller
971 result
= self
.db
.get_rows(SELECT
=('sdn_net_id',), FROM
='instance_nets',
972 WHERE
={'vim_net_id': vim_net_id
, 'datacenter_tenant_id': self
.datacenter_tenant_id
},
973 ORDER
="instance_scenario_id")
975 sdn_net_id
= result
[0]['sdn_net_id']
977 task
["status"] = "DONE"
978 task
["extra"]["vim_info"] = {}
979 task
["extra"]["created"] = False
980 task
["extra"]["vim_status"] = "BUILD"
981 task
["extra"]["sdn_net_id"] = sdn_net_id
982 task
["error_msg"] = None
983 task
["vim_id"] = vim_net_id
984 instance_element_update
= {"vim_net_id": vim_net_id
, "created": False, "status": "BUILD",
985 "error_msg": None, "sdn_net_id": sdn_net_id
}
986 return instance_element_update
988 def get_net(self
, task
):
989 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
991 params
= task
["params"]
992 filter_param
= params
[0]
993 instance_element_update
= self
._get
_net
_internal
(task
, filter_param
)
994 return instance_element_update
996 except (vimconn
.vimconnException
, VimThreadException
) as e
:
997 self
.logger
.error("task={} get-net: {}".format(task_id
, e
))
998 task
["status"] = "FAILED"
999 task
["vim_id"] = None
1000 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1001 instance_element_update
= {"vim_net_id": None, "status": "VIM_ERROR",
1002 "error_msg": task
["error_msg"]}
1003 return instance_element_update
1005 def new_net(self
, task
):
1007 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1011 if task
["extra"].get("find"):
1012 action_text
= "finding"
1013 filter_param
= task
["extra"]["find"][0]
1015 instance_element_update
= self
._get
_net
_internal
(task
, filter_param
)
1016 return instance_element_update
1017 except VimThreadExceptionNotFound
:
1020 params
= task
["params"]
1021 action_text
= "creating VIM"
1023 vim_net_id
, created_items
= self
.vim
.new_network(*params
[0:5])
1025 # net_name = params[0]
1026 # net_type = params[1]
1027 # wim_account_name = None
1028 # if len(params) >= 6:
1029 # wim_account_name = params[5]
1031 # TODO fix at nfvo adding external port
1032 # if wim_account_name and self.vim.config["wim_external_ports"]:
1033 # # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
1034 # action_text = "attaching external port to ovim network"
1035 # sdn_port_name = "external_port"
1037 # "compute_node": "__WIM:" + wim_account_name[0:58],
1039 # "vlan": network["vlan"],
1040 # "net_id": sdn_net_id,
1041 # "region": self.vim["config"]["datacenter_id"],
1042 # "name": sdn_port_name,
1045 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1046 # except ovimException:
1047 # sdn_port_data["compute_node"] = "__WIM"
1048 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1049 # self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
1051 task
["status"] = "DONE"
1052 task
["extra"]["vim_info"] = {}
1053 # task["extra"]["sdn_net_id"] = sdn_net_id
1054 task
["extra"]["vim_status"] = "BUILD"
1055 task
["extra"]["created"] = True
1056 task
["extra"]["created_items"] = created_items
1057 task
["error_msg"] = None
1058 task
["vim_id"] = vim_net_id
1059 instance_element_update
= {"vim_net_id": vim_net_id
, "status": "BUILD",
1060 "created": True, "error_msg": None}
1061 return instance_element_update
1062 except vimconn
.vimconnException
as e
:
1063 self
.logger
.error("task={} new-net: Error {}: {}".format(task_id
, action_text
, e
))
1064 task
["status"] = "FAILED"
1065 task
["vim_id"] = vim_net_id
1066 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1067 # task["extra"]["sdn_net_id"] = sdn_net_id
1068 instance_element_update
= {"vim_net_id": vim_net_id
, "status": "VIM_ERROR",
1069 "error_msg": task
["error_msg"]}
1070 return instance_element_update
1072 def del_net(self
, task
):
1073 net_vim_id
= task
["vim_id"]
1074 # sdn_net_id = task["extra"].get("sdn_net_id")
1077 self
.vim
.delete_network(net_vim_id
, task
["extra"].get("created_items"))
1079 # # Delete any attached port to this sdn network. There can be ports associated to this network in case
1080 # # it was manually done using 'openmano vim-net-sdn-attach'
1081 # port_list = self.ovim.get_ports(columns={'uuid'},
1082 # filter={'name': 'external_port', 'net_id': sdn_net_id})
1083 # for port in port_list:
1084 # self.ovim.delete_port(port['uuid'], idempotent=True)
1085 # self.ovim.delete_network(sdn_net_id, idempotent=True)
1086 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1087 task
["error_msg"] = None
1089 except vimconn
.vimconnException
as e
:
1090 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1091 if isinstance(e
, vimconn
.vimconnNotFoundException
):
1092 # If not found mark as Done and fill error_msg
1093 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1095 task
["status"] = "FAILED"
1098 def new_or_update_sdn_net(self
, task
):
1099 wimconn_net_id
= task
["vim_id"]
1100 created_items
= task
["extra"].get("created_items")
1101 connected_ports
= task
["extra"].get("connected_ports", [])
1102 new_connected_ports
= []
1103 last_update
= task
["extra"].get("last_update", 0)
1104 sdn_status
= "BUILD"
1107 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1111 if task
["extra"].get("find"):
1112 wimconn_id
= task
["extra"]["find"][0]
1114 instance_element_update
= self
.sdnconnector
.get_connectivity_service_status(wimconn_id
)
1115 wimconn_net_id
= wimconn_id
1116 instance_element_update
= {"wim_internal_id": wimconn_net_id
, "created": False, "status": "BUILD",
1117 "error_msg": None, }
1118 return instance_element_update
1119 except Exception as e
:
1120 if isinstance(e
, SdnConnectorError
) and e
.http_error
== HTTPStatus
.NOT_FOUND
.value
:
1123 self
._proccess
_sdn
_exception
(e
)
1125 params
= task
["params"]
1131 ports
= self
.db
.get_rows(FROM
='instance_interfaces', WHERE
={'instance_wim_net_id': task
["item_id"]})
1132 sdn_need_update
= False
1134 # TODO. Do not connect if already done
1135 if port
.get("compute_node") and port
.get("pci"):
1136 for map in self
.port_mappings
:
1137 if map.get("device_id") == port
["compute_node"] and \
1138 map.get("device_interface_id") == port
["pci"]:
1141 if self
.sdnconn_config
.get("mapping_not_needed"):
1143 "service_endpoint_id": "{}:{}".format(port
["compute_node"], port
["pci"]),
1144 "service_endpoint_encapsulation_info": {
1145 "vlan": port
["vlan"],
1146 "mac": port
["mac_address"],
1147 "device_id": port
["compute_node"],
1148 "device_interface_id": port
["pci"]
1153 error_list
.append("Port mapping not found for compute_node={} pci={}".format(
1154 port
["compute_node"], port
["pci"]))
1157 if port
["uuid"] not in connected_ports
or port
["modified_at"] > last_update
:
1158 sdn_need_update
= True
1159 new_connected_ports
.append(port
["uuid"])
1161 "service_endpoint_id": map["service_endpoint_id"],
1162 "service_endpoint_encapsulation_type": "dot1q" if port
["model"] == "SR-IOV" else None,
1163 "service_endpoint_encapsulation_info": {
1164 "vlan": port
["vlan"],
1165 "mac": port
["mac_address"],
1166 "device_id": map.get("device_id"),
1167 "device_interface_id": map.get("device_interface_id"),
1168 "switch_dpid": map.get("switch_dpid"),
1169 "switch_port": map.get("switch_port"),
1170 "service_mapping_info": map.get("service_mapping_info"),
1177 error_list
.append("Waiting for getting interfaces location from VIM. Obtained '{}' of {}"
1178 .format(len(ports
)-pending_ports
, len(ports
)))
1179 # if there are more ports to connect or they have been modified, call create/update
1180 if sdn_need_update
and len(sdn_ports
) >= 2:
1181 if not wimconn_net_id
:
1182 if params
[0] == "data":
1184 elif params
[0] == "ptp":
1189 wimconn_net_id
, created_items
= self
.sdnconnector
.create_connectivity_service(net_type
, sdn_ports
)
1191 created_items
= self
.sdnconnector
.edit_connectivity_service(wimconn_net_id
, conn_info
=created_items
,
1192 connection_points
=sdn_ports
)
1193 last_update
= time
.time()
1194 connected_ports
= new_connected_ports
1195 elif wimconn_net_id
:
1197 wim_status_dict
= self
.sdnconnector
.get_connectivity_service_status(wimconn_net_id
,
1198 conn_info
=created_items
)
1199 sdn_status
= wim_status_dict
["sdn_status"]
1200 if wim_status_dict
.get("error_msg"):
1201 error_list
.append(wim_status_dict
.get("error_msg"))
1202 if wim_status_dict
.get("sdn_info"):
1203 sdn_info
= str(wim_status_dict
.get("sdn_info"))
1204 except Exception as e
:
1205 self
._proccess
_sdn
_exception
(e
)
1207 task
["status"] = "DONE"
1208 task
["extra"]["vim_info"] = {}
1209 # task["extra"]["sdn_net_id"] = sdn_net_id
1210 task
["extra"]["vim_status"] = sdn_status
1211 task
["extra"]["created"] = True
1212 task
["extra"]["created_items"] = created_items
1213 task
["extra"]["connected_ports"] = connected_ports
1214 task
["extra"]["last_update"] = last_update
1215 task
["error_msg"] = self
._format
_vim
_error
_msg
(" ; ".join(error_list
))
1216 task
["vim_id"] = wimconn_net_id
1217 instance_element_update
= {"wim_internal_id": wimconn_net_id
, "status": sdn_status
,
1218 "created": True, "error_msg": task
["error_msg"] or None}
1219 except (vimconn
.vimconnException
, SdnConnectorError
) as e
:
1220 self
.logger
.error("task={} new-sdn-net: Error: {}".format(task_id
, e
))
1221 task
["status"] = "FAILED"
1222 task
["vim_id"] = wimconn_net_id
1223 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1224 # task["extra"]["sdn_net_id"] = sdn_net_id
1225 instance_element_update
= {"wim_internal_id": wimconn_net_id
, "status": "WIM_ERROR",
1226 "error_msg": task
["error_msg"]}
1228 instance_element_update
["wim_info"] = sdn_info
1229 return instance_element_update
1231 def del_sdn_net(self
, task
):
1232 wimconn_net_id
= task
["vim_id"]
1236 self
.sdnconnector
.delete_connectivity_service(wimconn_net_id
, task
["extra"].get("created_items"))
1237 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1238 task
["error_msg"] = None
1240 except Exception as e
:
1241 self
._proccess
_sdn
_exception
(e
)
1242 except SdnConnectorError
as e
:
1243 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1244 if e
.http_code
== HTTPStatus
.NOT_FOUND
.value
:
1245 # If not found mark as Done and fill error_msg
1246 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1247 task
["error_msg"] = None
1249 task
["status"] = "FAILED"
1252 # Service Function Instances
1253 def new_sfi(self
, task
):
1256 # Waits for interfaces to be ready (avoids failure)
1258 dep_id
= "TASK-" + str(task
["extra"]["depends_on"][0])
1259 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1261 interfaces
= task
["depends"][dep_id
]["extra"].get("interfaces")
1263 ingress_interface_id
= task
.get("extra").get("params").get("ingress_interface_id")
1264 egress_interface_id
= task
.get("extra").get("params").get("egress_interface_id")
1265 ingress_vim_interface_id
= None
1266 egress_vim_interface_id
= None
1267 for vim_interface
, interface_data
in interfaces
.items():
1268 if interface_data
.get("interface_id") == ingress_interface_id
:
1269 ingress_vim_interface_id
= vim_interface
1271 if ingress_interface_id
!= egress_interface_id
:
1272 for vim_interface
, interface_data
in interfaces
.items():
1273 if interface_data
.get("interface_id") == egress_interface_id
:
1274 egress_vim_interface_id
= vim_interface
1277 egress_vim_interface_id
= ingress_vim_interface_id
1278 if not ingress_vim_interface_id
or not egress_vim_interface_id
:
1279 error_text
= "Error creating Service Function Instance, Ingress: {}, Egress: {}".format(
1280 ingress_vim_interface_id
, egress_vim_interface_id
)
1281 self
.logger
.error(error_text
)
1282 task
["error_msg"] = error_text
1283 task
["status"] = "FAILED"
1284 task
["vim_id"] = None
1286 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1287 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack,
1288 # only the first ingress and first egress ports will be used to create the SFI (Port Pair).
1289 ingress_port_id_list
= [ingress_vim_interface_id
]
1290 egress_port_id_list
= [egress_vim_interface_id
]
1291 name
= "sfi-{}".format(task
["item_id"][:8])
1292 # By default no form of IETF SFC Encapsulation will be used
1293 vim_sfi_id
= self
.vim
.new_sfi(name
, ingress_port_id_list
, egress_port_id_list
, sfc_encap
=False)
1295 task
["extra"]["created"] = True
1296 task
["extra"]["vim_status"] = "ACTIVE"
1297 task
["error_msg"] = None
1298 task
["status"] = "DONE"
1299 task
["vim_id"] = vim_sfi_id
1300 instance_element_update
= {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id
, "error_msg": None}
1301 return instance_element_update
1303 except (vimconn
.vimconnException
, VimThreadException
) as e
:
1304 self
.logger
.error("Error creating Service Function Instance, task=%s: %s", task_id
, str(e
))
1305 error_text
= self
._format
_vim
_error
_msg
(str(e
))
1306 task
["error_msg"] = error_text
1307 task
["status"] = "FAILED"
1308 task
["vim_id"] = None
1309 instance_element_update
= {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text
}
1310 return instance_element_update
1312 def del_sfi(self
, task
):
1313 sfi_vim_id
= task
["vim_id"]
1315 self
.vim
.delete_sfi(sfi_vim_id
)
1316 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1317 task
["error_msg"] = None
1320 except vimconn
.vimconnException
as e
:
1321 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1322 if isinstance(e
, vimconn
.vimconnNotFoundException
):
1323 # If not found mark as Done and fill error_msg
1324 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1326 task
["status"] = "FAILED"
1329 def new_sf(self
, task
):
1332 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1334 depending_tasks
= ["TASK-" + str(dep_id
) for dep_id
in task
["extra"]["depends_on"]]
1335 # sfis = next(iter(task.get("depends").values())).get("extra").get("params")[5]
1336 sfis
= [task
.get("depends").get(dep_task
) for dep_task
in depending_tasks
]
1339 sfi_id_list
.append(sfi
.get("vim_id"))
1340 name
= "sf-{}".format(task
["item_id"][:8])
1341 # By default no form of IETF SFC Encapsulation will be used
1342 vim_sf_id
= self
.vim
.new_sf(name
, sfi_id_list
, sfc_encap
=False)
1344 task
["extra"]["created"] = True
1345 task
["extra"]["vim_status"] = "ACTIVE"
1346 task
["error_msg"] = None
1347 task
["status"] = "DONE"
1348 task
["vim_id"] = vim_sf_id
1349 instance_element_update
= {"status": "ACTIVE", "vim_sf_id": vim_sf_id
, "error_msg": None}
1350 return instance_element_update
1352 except (vimconn
.vimconnException
, VimThreadException
) as e
:
1353 self
.logger
.error("Error creating Service Function, task=%s: %s", task_id
, str(e
))
1354 error_text
= self
._format
_vim
_error
_msg
(str(e
))
1355 task
["error_msg"] = error_text
1356 task
["status"] = "FAILED"
1357 task
["vim_id"] = None
1358 instance_element_update
= {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text
}
1359 return instance_element_update
1361 def del_sf(self
, task
):
1362 sf_vim_id
= task
["vim_id"]
1364 self
.vim
.delete_sf(sf_vim_id
)
1365 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1366 task
["error_msg"] = None
1369 except vimconn
.vimconnException
as e
:
1370 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1371 if isinstance(e
, vimconn
.vimconnNotFoundException
):
1372 # If not found mark as Done and fill error_msg
1373 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1375 task
["status"] = "FAILED"
1378 def new_classification(self
, task
):
1379 vim_classification_id
= None
1381 params
= task
["params"]
1382 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1383 dep_id
= "TASK-" + str(task
["extra"]["depends_on"][0])
1385 interfaces
= task
.get("depends").get(dep_id
).get("extra").get("interfaces")
1386 # Bear in mind that different VIM connectors might support Classifications differently.
1387 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1388 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1389 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1390 # using the IPv4 flow classifier.
1391 logical_source_port_vim_id
= None
1392 logical_source_port_id
= params
.get("logical_source_port")
1393 for vim_interface
, interface_data
in interfaces
.items():
1394 if interface_data
.get("interface_id") == logical_source_port_id
:
1395 logical_source_port_vim_id
= vim_interface
1397 if not logical_source_port_vim_id
:
1398 error_text
= "Error creating Flow Classifier, Logical Source Port id {}".format(
1399 logical_source_port_id
)
1400 self
.logger
.error(error_text
)
1401 task
["error_msg"] = error_text
1402 task
["status"] = "FAILED"
1403 task
["vim_id"] = None
1406 name
= "c-{}".format(task
["item_id"][:8])
1407 # if not CIDR is given for the IP addresses, add /32:
1408 ip_proto
= int(params
.get("ip_proto"))
1409 source_ip
= params
.get("source_ip")
1410 destination_ip
= params
.get("destination_ip")
1411 source_port
= params
.get("source_port")
1412 destination_port
= params
.get("destination_port")
1413 definition
= {"logical_source_port": logical_source_port_vim_id
}
1419 elif ip_proto
== 17:
1421 definition
["protocol"] = ip_proto
1423 if '/' not in source_ip
:
1425 definition
["source_ip_prefix"] = source_ip
1427 definition
["source_port_range_min"] = source_port
1428 definition
["source_port_range_max"] = source_port
1429 if destination_port
:
1430 definition
["destination_port_range_min"] = destination_port
1431 definition
["destination_port_range_max"] = destination_port
1433 if '/' not in destination_ip
:
1434 destination_ip
+= '/32'
1435 definition
["destination_ip_prefix"] = destination_ip
1437 vim_classification_id
= self
.vim
.new_classification(
1438 name
, 'legacy_flow_classifier', definition
)
1440 task
["extra"]["created"] = True
1441 task
["extra"]["vim_status"] = "ACTIVE"
1442 task
["error_msg"] = None
1443 task
["status"] = "DONE"
1444 task
["vim_id"] = vim_classification_id
1445 instance_element_update
= {"status": "ACTIVE", "vim_classification_id": vim_classification_id
,
1447 return instance_element_update
1449 except (vimconn
.vimconnException
, VimThreadException
) as e
:
1450 self
.logger
.error("Error creating Classification, task=%s: %s", task_id
, str(e
))
1451 error_text
= self
._format
_vim
_error
_msg
(str(e
))
1452 task
["error_msg"] = error_text
1453 task
["status"] = "FAILED"
1454 task
["vim_id"] = None
1455 instance_element_update
= {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text
}
1456 return instance_element_update
1458 def del_classification(self
, task
):
1459 classification_vim_id
= task
["vim_id"]
1461 self
.vim
.delete_classification(classification_vim_id
)
1462 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1463 task
["error_msg"] = None
1466 except vimconn
.vimconnException
as e
:
1467 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1468 if isinstance(e
, vimconn
.vimconnNotFoundException
):
1469 # If not found mark as Done and fill error_msg
1470 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1472 task
["status"] = "FAILED"
1475 def new_sfp(self
, task
):
1478 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1479 depending_tasks
= [task
.get("depends").get("TASK-" + str(tsk_id
)) for tsk_id
in
1480 task
.get("extra").get("depends_on")]
1483 classification_id_list
= []
1484 for dep
in depending_tasks
:
1485 vim_id
= dep
.get("vim_id")
1486 resource
= dep
.get("item")
1487 if resource
== "instance_sfs":
1488 sf_id_list
.append(vim_id
)
1489 elif resource
== "instance_classifications":
1490 classification_id_list
.append(vim_id
)
1492 name
= "sfp-{}".format(task
["item_id"][:8])
1493 # By default no form of IETF SFC Encapsulation will be used
1494 vim_sfp_id
= self
.vim
.new_sfp(name
, classification_id_list
, sf_id_list
, sfc_encap
=False)
1496 task
["extra"]["created"] = True
1497 task
["extra"]["vim_status"] = "ACTIVE"
1498 task
["error_msg"] = None
1499 task
["status"] = "DONE"
1500 task
["vim_id"] = vim_sfp_id
1501 instance_element_update
= {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id
, "error_msg": None}
1502 return instance_element_update
1504 except (vimconn
.vimconnException
, VimThreadException
) as e
:
1505 self
.logger
.error("Error creating Service Function, task=%s: %s", task_id
, str(e
))
1506 error_text
= self
._format
_vim
_error
_msg
(str(e
))
1507 task
["error_msg"] = error_text
1508 task
["status"] = "FAILED"
1509 task
["vim_id"] = None
1510 instance_element_update
= {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text
}
1511 return instance_element_update
1513 def del_sfp(self
, task
):
1514 sfp_vim_id
= task
["vim_id"]
1516 self
.vim
.delete_sfp(sfp_vim_id
)
1517 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1518 task
["error_msg"] = None
1521 except vimconn
.vimconnException
as e
:
1522 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1523 if isinstance(e
, vimconn
.vimconnNotFoundException
):
1524 # If not found mark as Done and fill error_msg
1525 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1527 task
["status"] = "FAILED"
1530 def _refres_sfps(self
, task
):
1531 """Call VIM to get SFPs status"""
1532 database_update
= None
1534 vim_id
= task
["vim_id"]
1535 sfp_to_refresh_list
= [vim_id
]
1536 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1538 vim_dict
= self
.vim
.refresh_sfps_status(sfp_to_refresh_list
)
1539 vim_info
= vim_dict
[vim_id
]
1540 except vimconn
.vimconnException
as e
:
1541 # Mark all tasks at VIM_ERROR status
1542 self
.logger
.error("task={} get-sfp: vimconnException when trying to refresh sfps {}".format(task_id
, e
))
1543 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
1545 self
.logger
.debug("task={} get-sfp: vim_sfp_id={} result={}".format(task_id
, task
["vim_id"], vim_info
))
1546 #TODO: Revise this part
1547 vim_info_error_msg
= None
1548 if vim_info
.get("error_msg"):
1549 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info
["error_msg"])
1550 task_vim_info
= task
["extra"].get("vim_info")
1551 task_error_msg
= task
.get("error_msg")
1552 task_vim_status
= task
["extra"].get("vim_status")
1553 if task_vim_status
!= vim_info
["status"] or task_error_msg
!= vim_info_error_msg
or \
1554 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
1555 database_update
= {"status": vim_info
["status"], "error_msg": vim_info_error_msg
}
1556 if vim_info
.get("vim_info"):
1557 database_update
["vim_info"] = vim_info
["vim_info"]
1559 task
["extra"]["vim_status"] = vim_info
["status"]
1560 task
["error_msg"] = vim_info_error_msg
1561 if vim_info
.get("vim_info"):
1562 task
["extra"]["vim_info"] = vim_info
["vim_info"]
1564 return database_update
1566 def _refres_sfis(self
, task
):
1567 """Call VIM to get sfis status"""
1568 database_update
= None
1570 vim_id
= task
["vim_id"]
1571 sfi_to_refresh_list
= [vim_id
]
1572 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1574 vim_dict
= self
.vim
.refresh_sfis_status(sfi_to_refresh_list
)
1575 vim_info
= vim_dict
[vim_id
]
1576 except vimconn
.vimconnException
as e
:
1577 # Mark all tasks at VIM_ERROR status
1578 self
.logger
.error("task={} get-sfi: vimconnException when trying to refresh sfis {}".format(task_id
, e
))
1579 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
1581 self
.logger
.debug("task={} get-sfi: vim_sfi_id={} result={}".format(task_id
, task
["vim_id"], vim_info
))
1582 #TODO: Revise this part
1583 vim_info_error_msg
= None
1584 if vim_info
.get("error_msg"):
1585 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info
["error_msg"])
1586 task_vim_info
= task
["extra"].get("vim_info")
1587 task_error_msg
= task
.get("error_msg")
1588 task_vim_status
= task
["extra"].get("vim_status")
1589 if task_vim_status
!= vim_info
["status"] or task_error_msg
!= vim_info_error_msg
or \
1590 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
1591 database_update
= {"status": vim_info
["status"], "error_msg": vim_info_error_msg
}
1592 if vim_info
.get("vim_info"):
1593 database_update
["vim_info"] = vim_info
["vim_info"]
1595 task
["extra"]["vim_status"] = vim_info
["status"]
1596 task
["error_msg"] = vim_info_error_msg
1597 if vim_info
.get("vim_info"):
1598 task
["extra"]["vim_info"] = vim_info
["vim_info"]
1600 return database_update
1602 def _refres_sfs(self
, task
):
1603 """Call VIM to get sfs status"""
1604 database_update
= None
1606 vim_id
= task
["vim_id"]
1607 sf_to_refresh_list
= [vim_id
]
1608 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1610 vim_dict
= self
.vim
.refresh_sfs_status(sf_to_refresh_list
)
1611 vim_info
= vim_dict
[vim_id
]
1612 except vimconn
.vimconnException
as e
:
1613 # Mark all tasks at VIM_ERROR status
1614 self
.logger
.error("task={} get-sf: vimconnException when trying to refresh sfs {}".format(task_id
, e
))
1615 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
1617 self
.logger
.debug("task={} get-sf: vim_sf_id={} result={}".format(task_id
, task
["vim_id"], vim_info
))
1618 #TODO: Revise this part
1619 vim_info_error_msg
= None
1620 if vim_info
.get("error_msg"):
1621 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info
["error_msg"])
1622 task_vim_info
= task
["extra"].get("vim_info")
1623 task_error_msg
= task
.get("error_msg")
1624 task_vim_status
= task
["extra"].get("vim_status")
1625 if task_vim_status
!= vim_info
["status"] or task_error_msg
!= vim_info_error_msg
or \
1626 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
1627 database_update
= {"status": vim_info
["status"], "error_msg": vim_info_error_msg
}
1628 if vim_info
.get("vim_info"):
1629 database_update
["vim_info"] = vim_info
["vim_info"]
1631 task
["extra"]["vim_status"] = vim_info
["status"]
1632 task
["error_msg"] = vim_info_error_msg
1633 if vim_info
.get("vim_info"):
1634 task
["extra"]["vim_info"] = vim_info
["vim_info"]
1636 return database_update
1638 def _refres_classifications(self
, task
):
1639 """Call VIM to get classifications status"""
1640 database_update
= None
1642 vim_id
= task
["vim_id"]
1643 classification_to_refresh_list
= [vim_id
]
1644 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1646 vim_dict
= self
.vim
.refresh_classifications_status(classification_to_refresh_list
)
1647 vim_info
= vim_dict
[vim_id
]
1648 except vimconn
.vimconnException
as e
:
1649 # Mark all tasks at VIM_ERROR status
1650 self
.logger
.error("task={} get-classification: vimconnException when trying to refresh classifications {}"
1651 .format(task_id
, e
))
1652 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
1654 self
.logger
.debug("task={} get-classification: vim_classification_id={} result={}".format(task_id
,
1655 task
["vim_id"], vim_info
))
1656 #TODO: Revise this part
1657 vim_info_error_msg
= None
1658 if vim_info
.get("error_msg"):
1659 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info
["error_msg"])
1660 task_vim_info
= task
["extra"].get("vim_info")
1661 task_error_msg
= task
.get("error_msg")
1662 task_vim_status
= task
["extra"].get("vim_status")
1663 if task_vim_status
!= vim_info
["status"] or task_error_msg
!= vim_info_error_msg
or \
1664 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
1665 database_update
= {"status": vim_info
["status"], "error_msg": vim_info_error_msg
}
1666 if vim_info
.get("vim_info"):
1667 database_update
["vim_info"] = vim_info
["vim_info"]
1669 task
["extra"]["vim_status"] = vim_info
["status"]
1670 task
["error_msg"] = vim_info_error_msg
1671 if vim_info
.get("vim_info"):
1672 task
["extra"]["vim_info"] = vim_info
["vim_info"]
1674 return database_update