1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 Several vim_wim_actions can refer to the same element at VIM (flavor, network, ...). This is somethng to avoid if RO
28 is migrated to a non-relational database as mongo db. Each vim_wim_actions reference a different instance_Xxxxx
29 In this case "related" colunm contains the same value, to know they refer to the same vim. In case of deletion, it
30 there is related tasks using this element, it is not deleted, The vim_info needed to delete is transfered to other task
32 The task content is (M: stored at memory, D: stored at database):
33 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
34 MD task_index: index number of the task. This together with the previous forms a unique key identifier
35 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
36 MD vim_id: id of the vm,net,etc at VIM
37 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
38 MD item_id: uuid of the referenced entry in the previous table
39 MD action: CREATE, DELETE, FIND
40 MD status: SCHEDULED: action need to be done
42 DONE: Done and it must be polled to VIM periodically to see status. ONLY for action=CREATE or FIND
43 FAILED: It cannot be created/found/deleted
44 FINISHED: similar to DONE, but no refresh is needed anymore. Task is maintained at database but
45 it is never processed by any thread
46 SUPERSEDED: similar to FINSISHED, but nothing has been done to completed the task.
47 MD extra: text with yaml format at database, dict at memory with:
48 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken
49 from other related tasks
50 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains
52 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends
54 can contain an int (single index on the same instance-action) or str (compete action ID)
55 sdn_net_id: used for net.
56 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
57 iface_id: uuid of intance_interfaces
61 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
62 created: False if the VIM element is not created by other actions, and it should not be deleted
63 vim_status: VIM status of the element. Stored also at database in the instance_XXX
64 vim_info: Detailed information of a vm/net from the VIM. Stored at database in the instance_XXX but not at
66 M depends: dict with task_index(from depends_on) to dependency task
67 M params: same as extra[params]
68 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
69 MD created_at: task creation time. The task of creation must be the oldest
70 MD modified_at: next time task need to be processed. For example, for a refresh, it contain next time refresh must
72 MD related: All the tasks over the same VIM element have same "related". Note that other VIMs can contain the
73 same value of related, but this thread only process those task of one VIM. Also related can be the
74 same among several NS os isntance-scenarios
75 MD worker: Used to lock in case of several thread workers.
83 from osm_ro_plugin
import vimconn
84 from osm_ro_plugin
.sdnconn
import SdnConnectorError
86 from osm_ro
.db_base
import db_base_Exception
87 from http
import HTTPStatus
88 from copy
import deepcopy
90 __author__
= "Alfonso Tierno, Pablo Montes"
91 __date__
= "$28-Sep-2017 12:07:15$"
94 def is_task_id(task_id
):
95 return task_id
.startswith("TASK-")
98 class VimThreadException(Exception):
102 class VimThreadExceptionNotFound(VimThreadException
):
106 class vim_thread(threading
.Thread
):
107 REFRESH_BUILD
= 5 # 5 seconds
108 REFRESH_ACTIVE
= 60 # 1 minute
110 REFRESH_DELETE
= 3600 * 10
112 def __init__(self
, task_lock
, plugins
, name
=None, wim_account_id
=None, datacenter_tenant_id
=None, db
=None):
116 'name' name of thread
117 'host','user': host ip or name to manage and user
118 'db', 'db_lock': database class and lock to use it in exclusion
120 threading
.Thread
.__init
__(self
)
121 self
.plugins
= plugins
122 self
.plugin_name
= "unknown"
124 self
.sdnconnector
= None
125 self
.sdnconn_config
= None
126 self
.error_status
= None
127 self
.wim_account_id
= wim_account_id
128 self
.datacenter_tenant_id
= datacenter_tenant_id
129 self
.port_mappings
= None
130 if self
.wim_account_id
:
131 self
.target_k
= "wim_account_id"
132 self
.target_v
= self
.wim_account_id
134 self
.target_k
= "datacenter_vim_id"
135 self
.target_v
= self
.datacenter_tenant_id
137 self
.name
= wim_account_id
or str(datacenter_tenant_id
)
140 self
.vim_persistent_info
= {}
141 self
.my_id
= self
.name
[:64]
143 self
.logger
= logging
.getLogger('openmano.{}.{}'.format("vim" if self
.datacenter_tenant_id
else "sdn",
147 self
.task_lock
= task_lock
148 self
.task_queue
= queue
.Queue(2000)
150 def _proccess_sdn_exception(self
, exc
):
151 if isinstance(exc
, SdnConnectorError
):
154 self
.logger
.error("plugin={} throws a non SdnConnectorError exception {}".format(self
.plugin_name
, exc
),
156 raise SdnConnectorError(str(exc
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
.value
) from exc
158 def _proccess_vim_exception(self
, exc
):
159 if isinstance(exc
, vimconn
.VimConnException
):
162 self
.logger
.error("plugin={} throws a non vimconnException exception {}".format(self
.plugin_name
, exc
),
164 raise vimconn
.VimConnException(str(exc
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
.value
) from exc
166 def get_vim_sdn_connector(self
):
167 if self
.datacenter_tenant_id
:
169 from_
= "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
170 select_
= ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
171 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
172 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
173 'user', 'passwd', 'dt.config as dt_config')
174 where_
= {"dt.uuid": self
.datacenter_tenant_id
}
175 vims
= self
.db
.get_rows(FROM
=from_
, SELECT
=select_
, WHERE
=where_
)
179 vim_config
.update(yaml
.load(vim
["config"], Loader
=yaml
.Loader
))
181 vim_config
.update(yaml
.load(vim
["dt_config"], Loader
=yaml
.Loader
))
182 vim_config
['datacenter_tenant_id'] = vim
.get('datacenter_tenant_id')
183 vim_config
['datacenter_id'] = vim
.get('datacenter_id')
186 # vim_port_mappings = self.ovim.get_of_port_mappings(
187 # db_filter={"datacenter_id": vim_config['datacenter_id']})
188 # vim_config["wim_external_ports"] = [x for x in vim_port_mappings
189 # if x["service_mapping_info"].get("wim")]
190 self
.plugin_name
= "rovim_" + vim
["type"]
191 self
.vim
= self
.plugins
[self
.plugin_name
](
192 uuid
=vim
['datacenter_id'], name
=vim
['datacenter_name'],
193 tenant_id
=vim
['vim_tenant_id'], tenant_name
=vim
['vim_tenant_name'],
194 url
=vim
['vim_url'], url_admin
=vim
['vim_url_admin'],
195 user
=vim
['user'], passwd
=vim
['passwd'],
196 config
=vim_config
, persistent_info
=self
.vim_persistent_info
198 self
.error_status
= None
199 self
.logger
.info("Vim Connector loaded for vim_account={}, plugin={}".format(
200 self
.datacenter_tenant_id
, self
.plugin_name
))
201 except Exception as e
:
202 self
.logger
.error("Cannot load vimconnector for vim_account={} plugin={}: {}".format(
203 self
.datacenter_tenant_id
, self
.plugin_name
, e
))
205 self
.error_status
= "Error loading vimconnector: {}".format(e
)
208 wim_account
= self
.db
.get_rows(FROM
="wim_accounts", WHERE
={"uuid": self
.wim_account_id
})[0]
209 wim
= self
.db
.get_rows(FROM
="wims", WHERE
={"uuid": wim_account
["wim_id"]})[0]
211 self
.sdnconn_config
= yaml
.load(wim
["config"], Loader
=yaml
.Loader
)
213 self
.sdnconn_config
= {}
214 if wim_account
["config"]:
215 self
.sdnconn_config
.update(yaml
.load(wim_account
["config"], Loader
=yaml
.Loader
))
216 self
.port_mappings
= self
.db
.get_rows(FROM
="wim_port_mappings", WHERE
={"wim_id": wim_account
["wim_id"]})
217 if self
.port_mappings
:
218 self
.sdnconn_config
["service_endpoint_mapping"] = self
.port_mappings
219 self
.plugin_name
= "rosdn_" + wim
["type"]
220 self
.sdnconnector
= self
.plugins
[self
.plugin_name
](
221 wim
, wim_account
, config
=self
.sdnconn_config
)
222 self
.error_status
= None
223 self
.logger
.info("Sdn Connector loaded for wim_account={}, plugin={}".format(
224 self
.wim_account_id
, self
.plugin_name
))
225 except Exception as e
:
226 self
.logger
.error("Cannot load sdn connector for wim_account={}, plugin={}: {}".format(
227 self
.wim_account_id
, self
.plugin_name
, e
), exc_info
=True)
228 self
.sdnconnector
= None
229 self
.error_status
= self
._format
_vim
_error
_msg
("Error loading sdn connector: {}".format(e
))
231 def _get_db_task(self
):
233 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
241 # get 20 (database_limit) entries each time
242 vim_actions
= self
.db
.get_rows(FROM
="vim_wim_actions",
243 WHERE
={self
.target_k
: self
.target_v
,
244 "status": ['SCHEDULED', 'BUILD', 'DONE'],
245 "worker": [None, self
.my_id
], "modified_at<=": now
247 ORDER_BY
=("modified_at", "created_at",),
248 LIMIT
=database_limit
)
251 # if vim_actions[0]["modified_at"] > now:
252 # return int(vim_actions[0] - now)
253 for task
in vim_actions
:
255 if task_related
== task
["related"]:
256 continue # ignore if a locking has already tried for these task set
257 task_related
= task
["related"]
259 self
.db
.update_rows("vim_wim_actions", UPDATE
={"worker": self
.my_id
}, modified_time
=0,
260 WHERE
={self
.target_k
: self
.target_v
,
261 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
262 "worker": [None, self
.my_id
],
263 "related": task_related
,
264 "item": task
["item"],
266 # ... and read all related and check if locked
267 related_tasks
= self
.db
.get_rows(FROM
="vim_wim_actions",
268 WHERE
={self
.target_k
: self
.target_v
,
269 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
270 "related": task_related
,
271 "item": task
["item"],
273 ORDER_BY
=("created_at",))
274 # check that all related tasks have been locked. If not release and try again. It can happen
275 # for race conditions if a new related task has been inserted by nfvo in the process
276 some_tasks_locked
= False
277 some_tasks_not_locked
= False
279 for relate_task
in related_tasks
:
280 if relate_task
["worker"] != self
.my_id
:
281 some_tasks_not_locked
= True
283 some_tasks_locked
= True
284 if not creation_task
and relate_task
["action"] in ("CREATE", "FIND"):
285 creation_task
= relate_task
286 if some_tasks_not_locked
:
287 if some_tasks_locked
: # unlock
288 self
.db
.update_rows("vim_wim_actions", UPDATE
={"worker": None}, modified_time
=0,
289 WHERE
={self
.target_k
: self
.target_v
,
290 "worker": self
.my_id
,
291 "related": task_related
,
292 "item": task
["item"],
296 task
["params"] = None
298 extra
= yaml
.load(task
["extra"], Loader
=yaml
.Loader
)
301 task
["extra"] = extra
302 if extra
.get("depends_on"):
304 if extra
.get("params"):
305 task
["params"] = deepcopy(extra
["params"])
306 return task
, related_tasks
307 except Exception as e
:
308 self
.logger
.critical("Unexpected exception at _get_db_task: " + str(e
), exc_info
=True)
311 def _delete_task(self
, task
):
313 Determine if this task need to be done or superseded
317 def copy_extra_created(copy_to
, copy_from
):
318 copy_to
["created"] = copy_from
["created"]
319 if copy_from
.get("sdn_net_id"):
320 copy_to
["sdn_net_id"] = copy_from
["sdn_net_id"]
321 if copy_from
.get("interfaces"):
322 copy_to
["interfaces"] = copy_from
["interfaces"]
323 if copy_from
.get("sdn-ports"):
324 copy_to
["sdn-ports"] = copy_from
["sdn-ports"]
325 if copy_from
.get("created_items"):
326 if not copy_to
.get("created_items"):
327 copy_to
["created_items"] = {}
328 copy_to
["created_items"].update(copy_from
["created_items"])
331 dependency_task
= None
332 deletion_needed
= task
["extra"].get("created", False)
333 if task
["status"] == "FAILED":
334 return # TODO need to be retry??
336 # get all related tasks. task of creation must be the first in the list of related_task,
337 # unless the deletion fails and it is pendingit fails
338 # TODO this should be removed, passing related_tasks
339 related_tasks
= self
.db
.get_rows(FROM
="vim_wim_actions",
340 WHERE
={self
.target_k
: self
.target_v
,
341 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
342 "action": ["FIND", "CREATE"],
343 "related": task
["related"],
345 ORDER_BY
=("created_at",),
347 for related_task
in related_tasks
:
348 if related_task
["item"] == task
["item"] and related_task
["item_id"] == task
["item_id"]:
349 task_create
= related_task
351 if related_task
["extra"]:
352 extra_created
= yaml
.load(related_task
["extra"], Loader
=yaml
.Loader
)
353 if extra_created
.get("created"):
354 deletion_needed
= True
355 related_task
["extra"] = extra_created
356 elif not dependency_task
:
357 dependency_task
= related_task
358 if task_create
and dependency_task
:
361 # mark task_create as FINISHED
363 self
.db
.update_rows("vim_wim_actions", UPDATE
={"status": "FINISHED"},
364 WHERE
={self
.target_k
: self
.target_v
,
365 "instance_action_id": task_create
["instance_action_id"],
366 "task_index": task_create
["task_index"]
368 if not deletion_needed
:
370 elif dependency_task
:
371 # move create information from task_create to relate_task
372 extra_new_created
= yaml
.load(dependency_task
["extra"], Loader
=yaml
.Loader
) or {}
373 extra_new_created
["created"] = extra_created
["created"]
374 copy_extra_created(copy_to
=extra_new_created
, copy_from
=extra_created
)
376 self
.db
.update_rows("vim_wim_actions",
377 UPDATE
={"extra": yaml
.safe_dump(extra_new_created
, default_flow_style
=True,
379 "vim_id": task_create
.get("vim_id")},
380 WHERE
={self
.target_k
: self
.target_v
,
381 "instance_action_id": dependency_task
["instance_action_id"],
382 "task_index": dependency_task
["task_index"]
386 task
["vim_id"] = task_create
["vim_id"]
387 copy_extra_created(copy_to
=task
["extra"], copy_from
=task_create
["extra"])
388 # Ensure this task extra information is stored at database
389 self
.db
.update_rows("vim_wim_actions",
390 UPDATE
={"extra": yaml
.safe_dump(task
["extra"], default_flow_style
=True,
392 WHERE
={self
.target_k
: self
.target_v
,
393 "instance_action_id": task
["instance_action_id"],
394 "task_index": task
["task_index"],
397 return deletion_needed
399 except Exception as e
:
400 self
.logger
.critical("Unexpected exception at _delete_task: " + str(e
), exc_info
=True)
402 def _refres_vm(self
, task
):
403 """Call VIM to get VMs status"""
404 database_update
= None
406 vim_id
= task
["vim_id"]
407 vm_to_refresh_list
= [vim_id
]
409 vim_dict
= self
.vim
.refresh_vms_status(vm_to_refresh_list
)
410 vim_info
= vim_dict
[vim_id
]
411 except vimconn
.VimConnException
as e
:
412 # Mark all tasks at VIM_ERROR status
413 self
.logger
.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e
))
414 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
416 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
417 self
.logger
.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id
, task
["vim_id"], vim_info
))
419 # check and update interfaces
420 task_warning_msg
= ""
421 for interface
in vim_info
.get("interfaces", ()):
422 vim_interface_id
= interface
["vim_interface_id"]
423 if vim_interface_id
not in task
["extra"]["interfaces"]:
424 self
.logger
.critical("task={} get-VM: Interface not found {} on task info {}".format(
425 task_id
, vim_interface_id
, task
["extra"]["interfaces"]), exc_info
=True)
427 task_interface
= task
["extra"]["interfaces"][vim_interface_id
]
428 task_vim_interface
= task_interface
.get("vim_info")
429 if task_vim_interface
!= interface
:
431 # if task_interface.get("sdn_port_id"):
433 # self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
434 # task_interface["sdn_port_id"] = None
435 # except ovimException as e:
436 # error_text = "ovimException deleting external_port={}: {}".format(
437 # task_interface["sdn_port_id"], e)
438 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
439 # task_warning_msg += error_text
440 # # TODO Set error_msg at instance_nets instead of instance VMs
443 # sdn_net_id = task_interface.get("sdn_net_id")
444 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
445 # sdn_port_name = sdn_net_id + "." + task["vim_id"]
446 # sdn_port_name = sdn_port_name[:63]
448 # sdn_port_id = self.ovim.new_external_port(
449 # {"compute_node": interface["compute_node"],
450 # "pci": interface["pci"],
451 # "vlan": interface.get("vlan"),
452 # "net_id": sdn_net_id,
453 # "region": self.vim["config"]["datacenter_id"],
454 # "name": sdn_port_name,
455 # "mac": interface.get("mac_address")})
456 # task_interface["sdn_port_id"] = sdn_port_id
457 # except (ovimException, Exception) as e:
458 # error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
459 # format(interface["compute_node"], interface["pci"], interface.get("vlan"), e)
460 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
461 # task_warning_msg += error_text
462 # # TODO Set error_msg at instance_nets instead of instance VMs
464 self
.db
.update_rows('instance_interfaces',
465 UPDATE
={"mac_address": interface
.get("mac_address"),
466 "ip_address": interface
.get("ip_address"),
467 "vim_interface_id": interface
.get("vim_interface_id"),
468 "vim_info": interface
.get("vim_info"),
469 "sdn_port_id": task_interface
.get("sdn_port_id"),
470 "compute_node": interface
.get("compute_node"),
471 "pci": interface
.get("pci"),
472 "vlan": interface
.get("vlan")},
473 WHERE
={'uuid': task_interface
["iface_id"]})
474 task_interface
["vim_info"] = interface
475 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
476 # # TODO Send message to task SDN to update
478 # check and update task and instance_vms database
479 vim_info_error_msg
= None
480 if vim_info
.get("error_msg"):
481 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info
["error_msg"] + task_warning_msg
)
482 elif task_warning_msg
:
483 vim_info_error_msg
= self
._format
_vim
_error
_msg
(task_warning_msg
)
484 task_vim_info
= task
["extra"].get("vim_info")
485 task_error_msg
= task
.get("error_msg")
486 task_vim_status
= task
["extra"].get("vim_status")
487 if task_vim_status
!= vim_info
["status"] or task_error_msg
!= vim_info_error_msg
or \
488 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
489 database_update
= {"status": vim_info
["status"], "error_msg": vim_info_error_msg
}
490 if vim_info
.get("vim_info"):
491 database_update
["vim_info"] = vim_info
["vim_info"]
493 task
["extra"]["vim_status"] = vim_info
["status"]
494 task
["error_msg"] = vim_info_error_msg
495 if vim_info
.get("vim_info"):
496 task
["extra"]["vim_info"] = vim_info
["vim_info"]
498 return database_update
500 def _refres_net(self
, task
):
501 """Call VIM to get network status"""
502 database_update
= None
504 vim_id
= task
["vim_id"]
505 net_to_refresh_list
= [vim_id
]
507 vim_dict
= self
.vim
.refresh_nets_status(net_to_refresh_list
)
508 vim_info
= vim_dict
[vim_id
]
509 except vimconn
.VimConnException
as e
:
510 # Mark all tasks at VIM_ERROR status
511 self
.logger
.error("task=several get-net: vimconnException when trying to refresh nets " + str(e
))
512 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
514 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
515 self
.logger
.debug("task={} get-net: vim_net_id={} result={}".format(task_id
, task
["vim_id"], vim_info
))
517 task_vim_info
= task
["extra"].get("vim_info")
518 task_vim_status
= task
["extra"].get("vim_status")
519 task_error_msg
= task
.get("error_msg")
520 # task_sdn_net_id = task["extra"].get("sdn_net_id")
522 vim_info_status
= vim_info
["status"]
523 vim_info_error_msg
= vim_info
.get("error_msg")
525 # if task_sdn_net_id:
527 # sdn_net = self.ovim.show_network(task_sdn_net_id)
528 # except (ovimException, Exception) as e:
529 # text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
530 # self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
531 # sdn_net = {"status": "ERROR", "last_error": text_error}
532 # if sdn_net["status"] == "ERROR":
533 # if not vim_info_error_msg:
534 # vim_info_error_msg = str(sdn_net.get("last_error"))
536 # vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
537 # self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
538 # self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
539 # vim_info_status = "ERROR"
540 # elif sdn_net["status"] == "BUILD":
541 # if vim_info_status == "ACTIVE":
542 # vim_info_status = "BUILD"
545 if vim_info_error_msg
:
546 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info_error_msg
)
547 if task_vim_status
!= vim_info_status
or task_error_msg
!= vim_info_error_msg
or \
548 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
549 task
["extra"]["vim_status"] = vim_info_status
550 task
["error_msg"] = vim_info_error_msg
551 if vim_info
.get("vim_info"):
552 task
["extra"]["vim_info"] = vim_info
["vim_info"]
553 database_update
= {"status": vim_info_status
, "error_msg": vim_info_error_msg
}
554 if vim_info
.get("vim_info"):
555 database_update
["vim_info"] = vim_info
["vim_info"]
556 return database_update
558 def _proccess_pending_tasks(self
, task
, related_tasks
):
559 old_task_status
= task
["status"]
560 create_or_find
= False # if as result of processing this task something is created or found
562 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
565 if task
["status"] == "SCHEDULED":
566 # check if tasks that this depends on have been completed
567 dependency_not_completed
= False
568 dependency_modified_at
= 0
569 for task_index
in task
["extra"].get("depends_on", ()):
570 task_dependency
= self
._look
_for
_task
(task
["instance_action_id"], task_index
)
571 if not task_dependency
:
572 raise VimThreadException(
573 "Cannot get depending net task trying to get depending task {}.{}".format(
574 task
["instance_action_id"], task_index
))
575 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
576 # database must be look again
577 if task_dependency
["status"] == "SCHEDULED":
578 dependency_not_completed
= True
579 dependency_modified_at
= task_dependency
["modified_at"]
581 elif task_dependency
["status"] == "FAILED":
582 raise VimThreadException(
583 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
584 task
["action"], task
["item"],
585 task
["instance_action_id"], task
["task_index"],
586 task_dependency
["instance_action_id"], task_dependency
["task_index"],
587 task_dependency
["action"], task_dependency
["item"], task_dependency
.get("error_msg")))
589 task
["depends"]["TASK-"+str(task_index
)] = task_dependency
590 task
["depends"]["TASK-{}.{}".format(task
["instance_action_id"], task_index
)] = task_dependency
591 if dependency_not_completed
:
592 # Move this task to the time dependency is going to be modified plus 10 seconds.
593 self
.db
.update_rows("vim_wim_actions", modified_time
=dependency_modified_at
+ 10,
594 UPDATE
={"worker": None},
595 WHERE
={self
.target_k
: self
.target_v
, "worker": self
.my_id
,
596 "related": task
["related"],
598 # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
599 # if task["extra"]["tries"] > 3:
600 # raise VimThreadException(
601 # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
602 # "(task {}.{})".format(task["action"], task["item"],
603 # task["instance_action_id"], task["task_index"],
604 # task_dependency["instance_action_id"], task_dependency["task_index"]
605 # task_dependency["action"], task_dependency["item"]))
608 database_update
= None
609 if task
["action"] == "DELETE":
610 deleted_needed
= self
._delete
_task
(task
)
611 if not deleted_needed
:
612 task
["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
613 task
["error_msg"] = None
615 if task
["status"] == "SUPERSEDED":
616 # not needed to do anything but update database with the new status
617 database_update
= None
618 elif not self
.vim
and not self
.sdnconnector
:
619 task
["status"] = "FAILED"
620 task
["error_msg"] = self
.error_status
621 database_update
= {"status": "VIM_ERROR" if self
.datacenter_tenant_id
else "WIM_ERROR",
622 "error_msg": task
["error_msg"]}
623 elif task
["item_id"] != related_tasks
[0]["item_id"] and task
["action"] in ("FIND", "CREATE"):
624 # Do nothing, just copy values from one to another and update database
625 task
["status"] = related_tasks
[0]["status"]
626 task
["error_msg"] = related_tasks
[0]["error_msg"]
627 task
["vim_id"] = related_tasks
[0]["vim_id"]
628 extra
= yaml
.load(related_tasks
[0]["extra"], Loader
=yaml
.Loader
)
629 task
["extra"]["vim_status"] = extra
.get("vim_status")
630 next_refresh
= related_tasks
[0]["modified_at"] + 0.001
631 database_update
= {"status": task
["extra"].get("vim_status", "VIM_ERROR"),
632 "error_msg": task
["error_msg"]}
633 if task
["item"] == 'instance_vms':
634 database_update
["vim_vm_id"] = task
["vim_id"]
635 elif task
["item"] == 'instance_nets':
636 database_update
["vim_net_id"] = task
["vim_id"]
637 elif task
["item"] == 'instance_vms':
638 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
639 database_update
= self
._refres
_vm
(task
)
640 create_or_find
= True
641 elif task
["action"] == "CREATE":
642 create_or_find
= True
643 database_update
= self
.new_vm(task
)
644 elif task
["action"] == "DELETE":
647 raise vimconn
.VimConnException(self
.name
+ "unknown task action {}".format(task
["action"]))
648 elif task
["item"] == 'instance_nets':
649 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
650 database_update
= self
._refres
_net
(task
)
651 create_or_find
= True
652 elif task
["action"] == "CREATE":
653 create_or_find
= True
654 database_update
= self
.new_net(task
)
655 elif task
["action"] == "DELETE":
657 elif task
["action"] == "FIND":
658 database_update
= self
.get_net(task
)
660 raise vimconn
.VimConnException(self
.name
+ "unknown task action {}".format(task
["action"]))
661 elif task
["item"] == 'instance_wim_nets':
662 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
663 database_update
= self
.new_or_update_sdn_net(task
)
664 create_or_find
= True
665 elif task
["action"] == "CREATE":
666 create_or_find
= True
667 database_update
= self
.new_or_update_sdn_net(task
)
668 elif task
["action"] == "DELETE":
669 self
.del_sdn_net(task
)
670 elif task
["action"] == "FIND":
671 database_update
= self
.get_sdn_net(task
)
673 raise vimconn
.VimConnException(self
.name
+ "unknown task action {}".format(task
["action"]))
674 elif task
["item"] == 'instance_sfis':
675 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
676 database_update
= self
._refres
_sfis
(task
)
677 create_or_find
= True
678 elif task
["action"] == "CREATE":
679 create_or_find
= True
680 database_update
= self
.new_sfi(task
)
681 elif task
["action"] == "DELETE":
684 raise vimconn
.VimConnException(self
.name
+ "unknown task action {}".format(task
["action"]))
685 elif task
["item"] == 'instance_sfs':
686 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
687 database_update
= self
._refres
_sfs
(task
)
688 create_or_find
= True
689 elif task
["action"] == "CREATE":
690 create_or_find
= True
691 database_update
= self
.new_sf(task
)
692 elif task
["action"] == "DELETE":
695 raise vimconn
.VimConnException(self
.name
+ "unknown task action {}".format(task
["action"]))
696 elif task
["item"] == 'instance_classifications':
697 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
698 database_update
= self
._refres
_classifications
(task
)
699 create_or_find
= True
700 elif task
["action"] == "CREATE":
701 create_or_find
= True
702 database_update
= self
.new_classification(task
)
703 elif task
["action"] == "DELETE":
704 self
.del_classification(task
)
706 raise vimconn
.VimConnException(self
.name
+ "unknown task action {}".format(task
["action"]))
707 elif task
["item"] == 'instance_sfps':
708 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
709 database_update
= self
._refres
_sfps
(task
)
710 create_or_find
= True
711 elif task
["action"] == "CREATE":
712 create_or_find
= True
713 database_update
= self
.new_sfp(task
)
714 elif task
["action"] == "DELETE":
717 raise vimconn
.VimConnException(self
.name
+ "unknown task action {}".format(task
["action"]))
719 raise vimconn
.VimConnException(self
.name
+ "unknown task item {}".format(task
["item"]))
721 except Exception as e
:
722 if not isinstance(e
, VimThreadException
):
723 self
.logger
.error("Error executing task={}: {}".format(task_id
, e
), exc_info
=True)
724 task
["error_msg"] = str(e
)
725 task
["status"] = "FAILED"
726 database_update
= {"status": "VIM_ERROR" if task
["item"] != "instance_wim_nets" else "WIM_ERROR",
727 "error_msg": task
["error_msg"]}
728 # if task["item"] == 'instance_vms':
729 # database_update["vim_vm_id"] = None
730 # elif task["item"] == 'instance_nets':
731 # database_update["vim_net_id"] = None
733 self
.logger
.debug("task={} item={} action={} result={}:'{}' params={}".format(
734 task_id
, task
["item"], task
["action"], task
["status"],
735 task
["vim_id"] if task
["status"] == "DONE" else task
.get("error_msg"), task
["params"]))
738 if task
["status"] == "DONE":
739 next_refresh
= time
.time()
740 if task
["extra"].get("vim_status") == "BUILD":
741 next_refresh
+= self
.REFRESH_BUILD
742 elif task
["extra"].get("vim_status") in ("ERROR", "VIM_ERROR", "WIM_ERROR"):
743 next_refresh
+= self
.REFRESH_ERROR
744 elif task
["extra"].get("vim_status") == "DELETED":
745 next_refresh
+= self
.REFRESH_DELETE
747 next_refresh
+= self
.REFRESH_ACTIVE
748 elif task
["status"] == "FAILED":
749 next_refresh
= time
.time() + self
.REFRESH_DELETE
752 # modify all related task with action FIND/CREATED non SCHEDULED
754 table
="vim_wim_actions", modified_time
=next_refresh
+ 0.001,
755 UPDATE
={"status": task
["status"], "vim_id": task
.get("vim_id"),
756 "error_msg": task
["error_msg"],
759 WHERE
={self
.target_k
: self
.target_v
,
760 "worker": self
.my_id
,
761 "action": ["FIND", "CREATE"],
762 "related": task
["related"],
763 "status<>": "SCHEDULED",
767 table
="vim_wim_actions", modified_time
=next_refresh
,
768 UPDATE
={"status": task
["status"], "vim_id": task
.get("vim_id"),
769 "error_msg": task
["error_msg"],
770 "extra": yaml
.safe_dump(task
["extra"], default_flow_style
=True, width
=256)},
771 WHERE
={"instance_action_id": task
["instance_action_id"], "task_index": task
["task_index"]})
774 table
="vim_wim_actions", modified_time
=0,
775 UPDATE
={"worker": None},
776 WHERE
={self
.target_k
: self
.target_v
,
777 "worker": self
.my_id
,
778 "related": task
["related"],
781 # Update table instance_actions
782 if old_task_status
== "SCHEDULED" and task
["status"] != old_task_status
:
784 table
="instance_actions",
785 UPDATE
={("number_failed" if task
["status"] == "FAILED" else "number_done"): {"INCREMENT": 1}},
786 WHERE
={"uuid": task
["instance_action_id"]})
788 where_filter
= {"related": task
["related"]}
789 if task
["item"] == "instance_nets" and task
["datacenter_vim_id"]:
790 where_filter
["datacenter_tenant_id"] = task
["datacenter_vim_id"]
791 self
.db
.update_rows(table
=task
["item"],
792 UPDATE
=database_update
,
794 except db_base_Exception
as e
:
795 self
.logger
.error("task={} Error updating database {}".format(task_id
, e
), exc_info
=True)
797 def insert_task(self
, task
):
799 self
.task_queue
.put(task
, False)
802 raise vimconn
.VimConnException(self
.name
+ ": timeout inserting a task")
804 def del_task(self
, task
):
806 if task
["status"] == "SCHEDULED":
807 task
["status"] = "SUPERSEDED"
809 else: # task["status"] == "processing"
810 self
.task_lock
.release()
814 self
.logger
.info("Starting")
816 self
.get_vim_sdn_connector()
817 reload_thread
= False
821 while not self
.task_queue
.empty():
822 task
= self
.task_queue
.get()
823 if isinstance(task
, list):
825 elif isinstance(task
, str):
828 elif task
== 'reload':
831 self
.task_queue
.task_done()
835 task
, related_tasks
= self
._get
_db
_task
()
837 self
._proccess
_pending
_tasks
(task
, related_tasks
)
841 except Exception as e
:
842 self
.logger
.critical("Unexpected exception at run: " + str(e
), exc_info
=True)
844 self
.logger
.debug("Finishing")
846 def _look_for_task(self
, instance_action_id
, task_id
):
848 Look for a concrete task at vim_actions database table
849 :param instance_action_id: The instance_action_id
850 :param task_id: Can have several formats:
851 <task index>: integer
852 TASK-<task index> :backward compatibility,
853 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
854 :return: Task dictionary or None if not found
856 if isinstance(task_id
, int):
859 if task_id
.startswith("TASK-"):
860 task_id
= task_id
[5:]
861 ins_action_id
, _
, task_index
= task_id
.rpartition(".")
863 instance_action_id
= ins_action_id
865 tasks
= self
.db
.get_rows(FROM
="vim_wim_actions", WHERE
={"instance_action_id": instance_action_id
,
866 "task_index": task_index
})
870 task
["params"] = None
873 extra
= yaml
.load(task
["extra"], Loader
=yaml
.Loader
)
874 task
["extra"] = extra
875 task
["params"] = extra
.get("params")
881 def _format_vim_error_msg(error_text
, max_length
=1024):
882 if error_text
and len(error_text
) >= max_length
:
883 return error_text
[:max_length
// 2 - 3] + " ... " + error_text
[-max_length
// 2 + 3:]
886 def new_vm(self
, task
):
887 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
889 params
= task
["params"]
890 depends
= task
.get("depends")
893 if "net_id" in net
and is_task_id(net
["net_id"]): # change task_id into network_id
894 network_id
= task
["depends"][net
["net_id"]].get("vim_id")
896 raise VimThreadException(
897 "Cannot create VM because depends on a network not created or found: " +
898 str(depends
[net
["net_id"]]["error_msg"]))
899 net
["net_id"] = network_id
900 params_copy
= deepcopy(params
)
901 vim_vm_id
, created_items
= self
.vim
.new_vminstance(*params_copy
)
903 # fill task_interfaces. Look for snd_net_id at database for each interface
905 for iface
in params_copy
[5]:
906 task_interfaces
[iface
["vim_id"]] = {"iface_id": iface
["uuid"]}
907 result
= self
.db
.get_rows(
908 SELECT
=('sdn_net_id', 'interface_id'),
909 FROM
='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
910 WHERE
={'ii.uuid': iface
["uuid"]})
912 task_interfaces
[iface
["vim_id"]]["sdn_net_id"] = result
[0]['sdn_net_id']
913 task_interfaces
[iface
["vim_id"]]["interface_id"] = result
[0]['interface_id']
915 self
.logger
.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id
,
919 task
["vim_info"] = {}
920 task
["extra"]["interfaces"] = task_interfaces
921 task
["extra"]["created"] = True
922 task
["extra"]["created_items"] = created_items
923 task
["extra"]["vim_status"] = "BUILD"
924 task
["error_msg"] = None
925 task
["status"] = "DONE"
926 task
["vim_id"] = vim_vm_id
927 instance_element_update
= {"status": "BUILD", "vim_vm_id": vim_vm_id
, "error_msg": None}
928 return instance_element_update
930 except (vimconn
.VimConnException
, VimThreadException
) as e
:
931 self
.logger
.error("task={} new-VM: {}".format(task_id
, e
))
932 error_text
= self
._format
_vim
_error
_msg
(str(e
))
933 task
["error_msg"] = error_text
934 task
["status"] = "FAILED"
935 task
["vim_id"] = None
936 instance_element_update
= {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text
}
937 return instance_element_update
939 def del_vm(self
, task
):
940 # task_id = task["instance_action_id"] + "." + str(task["task_index"])
941 vm_vim_id
= task
["vim_id"]
942 # interfaces = task["extra"].get("interfaces", ())
944 # for iface in interfaces.values():
945 # if iface.get("sdn_port_id"):
947 # self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
948 # except ovimException as e:
949 # self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
950 # task_id, iface["sdn_port_id"], e), exc_info=True)
951 # # TODO Set error_msg at instance_nets
953 self
.vim
.delete_vminstance(vm_vim_id
, task
["extra"].get("created_items"))
954 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
955 task
["error_msg"] = None
958 except vimconn
.VimConnException
as e
:
959 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
960 if isinstance(e
, vimconn
.VimConnNotFoundException
):
961 # If not found mark as Done and fill error_msg
962 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
964 task
["status"] = "FAILED"
967 def _get_net_internal(self
, task
, filter_param
):
969 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
970 :param task: task for this find or find-or-create action
971 :param filter_param: parameters to send to the vimconnector
972 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
973 when network is not found or found more than one
975 vim_nets
= self
.vim
.get_network_list(filter_param
)
977 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param
))
978 elif len(vim_nets
) > 1:
979 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param
))
980 vim_net_id
= vim_nets
[0]["id"]
982 # Discover if this network is managed by a sdn controller
984 result
= self
.db
.get_rows(SELECT
=('sdn_net_id',), FROM
='instance_nets',
985 WHERE
={'vim_net_id': vim_net_id
, 'datacenter_tenant_id': self
.datacenter_tenant_id
},
986 ORDER
="instance_scenario_id")
988 sdn_net_id
= result
[0]['sdn_net_id']
990 task
["status"] = "DONE"
991 task
["extra"]["vim_info"] = {}
992 task
["extra"]["created"] = False
993 task
["extra"]["vim_status"] = "BUILD"
994 task
["extra"]["sdn_net_id"] = sdn_net_id
995 task
["error_msg"] = None
996 task
["vim_id"] = vim_net_id
997 instance_element_update
= {"vim_net_id": vim_net_id
, "created": False, "status": "BUILD",
998 "error_msg": None, "sdn_net_id": sdn_net_id
}
999 return instance_element_update
1001 def get_net(self
, task
):
1002 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1004 params
= task
["params"]
1005 filter_param
= params
[0]
1006 instance_element_update
= self
._get
_net
_internal
(task
, filter_param
)
1007 return instance_element_update
1009 except (vimconn
.VimConnException
, VimThreadException
) as e
:
1010 self
.logger
.error("task={} get-net: {}".format(task_id
, e
))
1011 task
["status"] = "FAILED"
1012 task
["vim_id"] = None
1013 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1014 instance_element_update
= {"vim_net_id": None, "status": "VIM_ERROR",
1015 "error_msg": task
["error_msg"]}
1016 return instance_element_update
1018 def new_net(self
, task
):
1020 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1024 if task
["extra"].get("find"):
1025 action_text
= "finding"
1026 filter_param
= task
["extra"]["find"][0]
1028 instance_element_update
= self
._get
_net
_internal
(task
, filter_param
)
1029 return instance_element_update
1030 except VimThreadExceptionNotFound
:
1033 params
= task
["params"]
1034 action_text
= "creating VIM"
1036 vim_net_id
, created_items
= self
.vim
.new_network(*params
[0:5])
1038 # net_name = params[0]
1039 # net_type = params[1]
1040 # wim_account_name = None
1041 # if len(params) >= 6:
1042 # wim_account_name = params[5]
1044 # TODO fix at nfvo adding external port
1045 # if wim_account_name and self.vim.config["wim_external_ports"]:
1046 # # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
1047 # action_text = "attaching external port to ovim network"
1048 # sdn_port_name = "external_port"
1050 # "compute_node": "__WIM:" + wim_account_name[0:58],
1052 # "vlan": network["vlan"],
1053 # "net_id": sdn_net_id,
1054 # "region": self.vim["config"]["datacenter_id"],
1055 # "name": sdn_port_name,
1058 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1059 # except ovimException:
1060 # sdn_port_data["compute_node"] = "__WIM"
1061 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1062 # self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
1064 task
["status"] = "DONE"
1065 task
["extra"]["vim_info"] = {}
1066 # task["extra"]["sdn_net_id"] = sdn_net_id
1067 task
["extra"]["vim_status"] = "BUILD"
1068 task
["extra"]["created"] = True
1069 task
["extra"]["created_items"] = created_items
1070 task
["error_msg"] = None
1071 task
["vim_id"] = vim_net_id
1072 instance_element_update
= {"vim_net_id": vim_net_id
, "status": "BUILD",
1073 "created": True, "error_msg": None}
1074 return instance_element_update
1075 except vimconn
.VimConnException
as e
:
1076 self
.logger
.error("task={} new-net: Error {}: {}".format(task_id
, action_text
, e
))
1077 task
["status"] = "FAILED"
1078 task
["vim_id"] = vim_net_id
1079 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1080 # task["extra"]["sdn_net_id"] = sdn_net_id
1081 instance_element_update
= {"vim_net_id": vim_net_id
, "status": "VIM_ERROR",
1082 "error_msg": task
["error_msg"]}
1083 return instance_element_update
1085 def del_net(self
, task
):
1086 net_vim_id
= task
["vim_id"]
1087 # sdn_net_id = task["extra"].get("sdn_net_id")
1090 self
.vim
.delete_network(net_vim_id
, task
["extra"].get("created_items"))
1092 # # Delete any attached port to this sdn network. There can be ports associated to this network in case
1093 # # it was manually done using 'openmano vim-net-sdn-attach'
1094 # port_list = self.ovim.get_ports(columns={'uuid'},
1095 # filter={'name': 'external_port', 'net_id': sdn_net_id})
1096 # for port in port_list:
1097 # self.ovim.delete_port(port['uuid'], idempotent=True)
1098 # self.ovim.delete_network(sdn_net_id, idempotent=True)
1099 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1100 task
["error_msg"] = None
1102 except vimconn
.VimConnException
as e
:
1103 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1104 if isinstance(e
, vimconn
.VimConnNotFoundException
):
1105 # If not found mark as Done and fill error_msg
1106 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1108 task
["status"] = "FAILED"
1111 def new_or_update_sdn_net(self
, task
):
1112 wimconn_net_id
= task
["vim_id"]
1113 created_items
= task
["extra"].get("created_items")
1114 connected_ports
= task
["extra"].get("connected_ports", [])
1115 new_connected_ports
= []
1116 last_update
= task
["extra"].get("last_update", 0)
1117 sdn_status
= task
["extra"].get("vim_status", "BUILD")
1120 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1124 if task
["extra"].get("find"):
1125 wimconn_id
= task
["extra"]["find"][0]
1127 instance_element_update
= self
.sdnconnector
.get_connectivity_service_status(wimconn_id
)
1128 wimconn_net_id
= wimconn_id
1129 instance_element_update
= {"wim_internal_id": wimconn_net_id
, "created": False, "status": "BUILD",
1130 "error_msg": None, }
1131 return instance_element_update
1132 except Exception as e
:
1133 if isinstance(e
, SdnConnectorError
) and e
.http_error
== HTTPStatus
.NOT_FOUND
.value
:
1136 self
._proccess
_sdn
_exception
(e
)
1138 params
= task
["params"]
1145 ports
= self
.db
.get_rows(FROM
='instance_interfaces', WHERE
={'instance_wim_net_id': task
["item_id"]})
1146 sdn_need_update
= False
1148 vlan_used
= port
.get("vlan") or vlan_used
1149 # TODO. Do not connect if already done
1150 if port
.get("compute_node") and port
.get("pci"):
1151 for pmap
in self
.port_mappings
:
1152 if pmap
.get("device_id") == port
["compute_node"] and \
1153 pmap
.get("device_interface_id") == port
["pci"]:
1156 if self
.sdnconn_config
.get("mapping_not_needed"):
1158 "service_endpoint_id": "{}:{}".format(port
["compute_node"], port
["pci"]),
1159 "service_endpoint_encapsulation_info": {
1160 "vlan": port
["vlan"],
1161 "mac": port
["mac_address"],
1162 "device_id": port
["compute_node"],
1163 "device_interface_id": port
["pci"]
1168 error_list
.append("Port mapping not found for compute_node={} pci={}".format(
1169 port
["compute_node"], port
["pci"]))
1172 if port
["modified_at"] > last_update
:
1173 sdn_need_update
= True
1174 new_connected_ports
.append(port
["uuid"])
1176 "service_endpoint_id": pmap
["service_endpoint_id"],
1177 "service_endpoint_encapsulation_type": "dot1q" if port
["model"] == "SR-IOV" else None,
1178 "service_endpoint_encapsulation_info": {
1179 "vlan": port
["vlan"],
1180 "mac": port
["mac_address"],
1181 "device_id": pmap
.get("device_id"),
1182 "device_interface_id": pmap
.get("device_interface_id"),
1183 "switch_dpid": pmap
.get("switch_dpid"),
1184 "switch_port": pmap
.get("switch_port"),
1185 "service_mapping_info": pmap
.get("service_mapping_info"),
1192 error_list
.append("Waiting for getting interfaces location from VIM. Obtained '{}' of {}"
1193 .format(len(ports
)-pending_ports
, len(ports
)))
1195 # connect external ports
1196 for index
, external_port
in enumerate(task
["extra"].get("sdn-ports") or ()):
1197 external_port_id
= external_port
.get("service_endpoint_id") or str(index
)
1199 "service_endpoint_id": external_port_id
,
1200 "service_endpoint_encapsulation_type": external_port
.get("service_endpoint_encapsulation_type",
1202 "service_endpoint_encapsulation_info": {
1203 "vlan": external_port
.get("vlan") or vlan_used
,
1204 "mac": external_port
.get("mac_address"),
1205 "device_id": external_port
.get("device_id"),
1206 "device_interface_id": external_port
.get("device_interface_id"),
1207 "switch_dpid": external_port
.get("switch_dpid") or external_port
.get("switch_id"),
1208 "switch_port": external_port
.get("switch_port"),
1209 "service_mapping_info": external_port
.get("service_mapping_info"),
1211 new_connected_ports
.append(external_port_id
)
1213 # if there are more ports to connect or they have been modified, call create/update
1215 if set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1216 last_update
= time
.time()
1217 if not wimconn_net_id
:
1218 if len(sdn_ports
) < 2:
1219 if not pending_ports
:
1220 sdn_status
= "ACTIVE"
1222 if params
[0] == "data":
1224 elif params
[0] == "ptp":
1228 wimconn_net_id
, created_items
= self
.sdnconnector
.create_connectivity_service(
1229 net_type
, sdn_ports
)
1231 created_items
= self
.sdnconnector
.edit_connectivity_service(
1232 wimconn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
)
1233 connected_ports
= new_connected_ports
1234 elif wimconn_net_id
:
1235 wim_status_dict
= self
.sdnconnector
.get_connectivity_service_status(wimconn_net_id
,
1236 conn_info
=created_items
)
1237 sdn_status
= wim_status_dict
["sdn_status"]
1238 if wim_status_dict
.get("error_msg"):
1239 error_list
.append(wim_status_dict
.get("error_msg"))
1240 if wim_status_dict
.get("sdn_info"):
1241 sdn_info
= str(wim_status_dict
.get("sdn_info"))
1242 except Exception as e
:
1243 self
._proccess
_sdn
_exception
(e
)
1245 task
["status"] = "DONE"
1246 task
["extra"]["vim_info"] = {}
1247 # task["extra"]["sdn_net_id"] = sdn_net_id
1248 task
["extra"]["vim_status"] = sdn_status
1249 task
["extra"]["created"] = True
1250 task
["extra"]["created_items"] = created_items
1251 task
["extra"]["connected_ports"] = connected_ports
1252 task
["extra"]["last_update"] = last_update
1253 task
["error_msg"] = self
._format
_vim
_error
_msg
(" ; ".join(error_list
))
1254 task
["vim_id"] = wimconn_net_id
1255 instance_element_update
= {"wim_internal_id": wimconn_net_id
, "status": sdn_status
,
1256 "created": True, "error_msg": task
["error_msg"] or None}
1257 except (vimconn
.VimConnException
, SdnConnectorError
) as e
:
1258 self
.logger
.error("task={} new-sdn-net: Error: {}".format(task_id
, e
))
1259 task
["status"] = "FAILED"
1260 task
["vim_id"] = wimconn_net_id
1261 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1262 # task["extra"]["sdn_net_id"] = sdn_net_id
1263 instance_element_update
= {"wim_internal_id": wimconn_net_id
, "status": "WIM_ERROR",
1264 "error_msg": task
["error_msg"]}
1267 instance_element_update
["wim_info"] = sdn_info
1268 return instance_element_update
1270 def del_sdn_net(self
, task
):
1271 wimconn_net_id
= task
["vim_id"]
1275 self
.sdnconnector
.delete_connectivity_service(wimconn_net_id
, task
["extra"].get("created_items"))
1276 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1277 task
["error_msg"] = None
1279 except Exception as e
:
1280 self
._proccess
_sdn
_exception
(e
)
1281 except SdnConnectorError
as e
:
1282 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1283 if e
.http_code
== HTTPStatus
.NOT_FOUND
.value
:
1284 # If not found mark as Done and fill error_msg
1285 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1286 task
["error_msg"] = None
1288 task
["status"] = "FAILED"
1291 # Service Function Instances
1292 def new_sfi(self
, task
):
1295 # Waits for interfaces to be ready (avoids failure)
1297 dep_id
= "TASK-" + str(task
["extra"]["depends_on"][0])
1298 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1300 interfaces
= task
["depends"][dep_id
]["extra"].get("interfaces")
1302 ingress_interface_id
= task
.get("extra").get("params").get("ingress_interface_id")
1303 egress_interface_id
= task
.get("extra").get("params").get("egress_interface_id")
1304 ingress_vim_interface_id
= None
1305 egress_vim_interface_id
= None
1306 for vim_interface
, interface_data
in interfaces
.items():
1307 if interface_data
.get("interface_id") == ingress_interface_id
:
1308 ingress_vim_interface_id
= vim_interface
1310 if ingress_interface_id
!= egress_interface_id
:
1311 for vim_interface
, interface_data
in interfaces
.items():
1312 if interface_data
.get("interface_id") == egress_interface_id
:
1313 egress_vim_interface_id
= vim_interface
1316 egress_vim_interface_id
= ingress_vim_interface_id
1317 if not ingress_vim_interface_id
or not egress_vim_interface_id
:
1318 error_text
= "Error creating Service Function Instance, Ingress: {}, Egress: {}".format(
1319 ingress_vim_interface_id
, egress_vim_interface_id
)
1320 self
.logger
.error(error_text
)
1321 task
["error_msg"] = error_text
1322 task
["status"] = "FAILED"
1323 task
["vim_id"] = None
1325 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1326 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack,
1327 # only the first ingress and first egress ports will be used to create the SFI (Port Pair).
1328 ingress_port_id_list
= [ingress_vim_interface_id
]
1329 egress_port_id_list
= [egress_vim_interface_id
]
1330 name
= "sfi-{}".format(task
["item_id"][:8])
1331 # By default no form of IETF SFC Encapsulation will be used
1332 vim_sfi_id
= self
.vim
.new_sfi(name
, ingress_port_id_list
, egress_port_id_list
, sfc_encap
=False)
1334 task
["extra"]["created"] = True
1335 task
["extra"]["vim_status"] = "ACTIVE"
1336 task
["error_msg"] = None
1337 task
["status"] = "DONE"
1338 task
["vim_id"] = vim_sfi_id
1339 instance_element_update
= {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id
, "error_msg": None}
1340 return instance_element_update
1342 except (vimconn
.VimConnException
, VimThreadException
) as e
:
1343 self
.logger
.error("Error creating Service Function Instance, task=%s: %s", task_id
, str(e
))
1344 error_text
= self
._format
_vim
_error
_msg
(str(e
))
1345 task
["error_msg"] = error_text
1346 task
["status"] = "FAILED"
1347 task
["vim_id"] = None
1348 instance_element_update
= {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text
}
1349 return instance_element_update
1351 def del_sfi(self
, task
):
1352 sfi_vim_id
= task
["vim_id"]
1354 self
.vim
.delete_sfi(sfi_vim_id
)
1355 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1356 task
["error_msg"] = None
1359 except vimconn
.VimConnException
as e
:
1360 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1361 if isinstance(e
, vimconn
.VimConnNotFoundException
):
1362 # If not found mark as Done and fill error_msg
1363 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1365 task
["status"] = "FAILED"
1368 def new_sf(self
, task
):
1371 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1373 depending_tasks
= ["TASK-" + str(dep_id
) for dep_id
in task
["extra"]["depends_on"]]
1374 # sfis = next(iter(task.get("depends").values())).get("extra").get("params")[5]
1375 sfis
= [task
.get("depends").get(dep_task
) for dep_task
in depending_tasks
]
1378 sfi_id_list
.append(sfi
.get("vim_id"))
1379 name
= "sf-{}".format(task
["item_id"][:8])
1380 # By default no form of IETF SFC Encapsulation will be used
1381 vim_sf_id
= self
.vim
.new_sf(name
, sfi_id_list
, sfc_encap
=False)
1383 task
["extra"]["created"] = True
1384 task
["extra"]["vim_status"] = "ACTIVE"
1385 task
["error_msg"] = None
1386 task
["status"] = "DONE"
1387 task
["vim_id"] = vim_sf_id
1388 instance_element_update
= {"status": "ACTIVE", "vim_sf_id": vim_sf_id
, "error_msg": None}
1389 return instance_element_update
1391 except (vimconn
.VimConnException
, VimThreadException
) as e
:
1392 self
.logger
.error("Error creating Service Function, task=%s: %s", task_id
, str(e
))
1393 error_text
= self
._format
_vim
_error
_msg
(str(e
))
1394 task
["error_msg"] = error_text
1395 task
["status"] = "FAILED"
1396 task
["vim_id"] = None
1397 instance_element_update
= {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text
}
1398 return instance_element_update
1400 def del_sf(self
, task
):
1401 sf_vim_id
= task
["vim_id"]
1403 self
.vim
.delete_sf(sf_vim_id
)
1404 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1405 task
["error_msg"] = None
1408 except vimconn
.VimConnException
as e
:
1409 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1410 if isinstance(e
, vimconn
.VimConnNotFoundException
):
1411 # If not found mark as Done and fill error_msg
1412 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1414 task
["status"] = "FAILED"
1417 def new_classification(self
, task
):
1418 vim_classification_id
= None
1420 params
= task
["params"]
1421 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1422 dep_id
= "TASK-" + str(task
["extra"]["depends_on"][0])
1424 interfaces
= task
.get("depends").get(dep_id
).get("extra").get("interfaces")
1425 # Bear in mind that different VIM connectors might support Classifications differently.
1426 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1427 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1428 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1429 # using the IPv4 flow classifier.
1430 logical_source_port_vim_id
= None
1431 logical_source_port_id
= params
.get("logical_source_port")
1432 for vim_interface
, interface_data
in interfaces
.items():
1433 if interface_data
.get("interface_id") == logical_source_port_id
:
1434 logical_source_port_vim_id
= vim_interface
1436 if not logical_source_port_vim_id
:
1437 error_text
= "Error creating Flow Classifier, Logical Source Port id {}".format(
1438 logical_source_port_id
)
1439 self
.logger
.error(error_text
)
1440 task
["error_msg"] = error_text
1441 task
["status"] = "FAILED"
1442 task
["vim_id"] = None
1445 name
= "c-{}".format(task
["item_id"][:8])
1446 # if not CIDR is given for the IP addresses, add /32:
1447 ip_proto
= int(params
.get("ip_proto"))
1448 source_ip
= params
.get("source_ip")
1449 destination_ip
= params
.get("destination_ip")
1450 source_port
= params
.get("source_port")
1451 destination_port
= params
.get("destination_port")
1452 definition
= {"logical_source_port": logical_source_port_vim_id
}
1458 elif ip_proto
== 17:
1460 definition
["protocol"] = ip_proto
1462 if '/' not in source_ip
:
1464 definition
["source_ip_prefix"] = source_ip
1466 definition
["source_port_range_min"] = source_port
1467 definition
["source_port_range_max"] = source_port
1468 if destination_port
:
1469 definition
["destination_port_range_min"] = destination_port
1470 definition
["destination_port_range_max"] = destination_port
1472 if '/' not in destination_ip
:
1473 destination_ip
+= '/32'
1474 definition
["destination_ip_prefix"] = destination_ip
1476 vim_classification_id
= self
.vim
.new_classification(
1477 name
, 'legacy_flow_classifier', definition
)
1479 task
["extra"]["created"] = True
1480 task
["extra"]["vim_status"] = "ACTIVE"
1481 task
["error_msg"] = None
1482 task
["status"] = "DONE"
1483 task
["vim_id"] = vim_classification_id
1484 instance_element_update
= {"status": "ACTIVE", "vim_classification_id": vim_classification_id
,
1486 return instance_element_update
1488 except (vimconn
.VimConnException
, VimThreadException
) as e
:
1489 self
.logger
.error("Error creating Classification, task=%s: %s", task_id
, str(e
))
1490 error_text
= self
._format
_vim
_error
_msg
(str(e
))
1491 task
["error_msg"] = error_text
1492 task
["status"] = "FAILED"
1493 task
["vim_id"] = None
1494 instance_element_update
= {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text
}
1495 return instance_element_update
1497 def del_classification(self
, task
):
1498 classification_vim_id
= task
["vim_id"]
1500 self
.vim
.delete_classification(classification_vim_id
)
1501 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1502 task
["error_msg"] = None
1505 except vimconn
.VimConnException
as e
:
1506 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1507 if isinstance(e
, vimconn
.VimConnNotFoundException
):
1508 # If not found mark as Done and fill error_msg
1509 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1511 task
["status"] = "FAILED"
1514 def new_sfp(self
, task
):
1517 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1518 depending_tasks
= [task
.get("depends").get("TASK-" + str(tsk_id
)) for tsk_id
in
1519 task
.get("extra").get("depends_on")]
1522 classification_id_list
= []
1523 for dep
in depending_tasks
:
1524 vim_id
= dep
.get("vim_id")
1525 resource
= dep
.get("item")
1526 if resource
== "instance_sfs":
1527 sf_id_list
.append(vim_id
)
1528 elif resource
== "instance_classifications":
1529 classification_id_list
.append(vim_id
)
1531 name
= "sfp-{}".format(task
["item_id"][:8])
1532 # By default no form of IETF SFC Encapsulation will be used
1533 vim_sfp_id
= self
.vim
.new_sfp(name
, classification_id_list
, sf_id_list
, sfc_encap
=False)
1535 task
["extra"]["created"] = True
1536 task
["extra"]["vim_status"] = "ACTIVE"
1537 task
["error_msg"] = None
1538 task
["status"] = "DONE"
1539 task
["vim_id"] = vim_sfp_id
1540 instance_element_update
= {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id
, "error_msg": None}
1541 return instance_element_update
1543 except (vimconn
.VimConnException
, VimThreadException
) as e
:
1544 self
.logger
.error("Error creating Service Function, task=%s: %s", task_id
, str(e
))
1545 error_text
= self
._format
_vim
_error
_msg
(str(e
))
1546 task
["error_msg"] = error_text
1547 task
["status"] = "FAILED"
1548 task
["vim_id"] = None
1549 instance_element_update
= {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text
}
1550 return instance_element_update
1552 def del_sfp(self
, task
):
1553 sfp_vim_id
= task
["vim_id"]
1555 self
.vim
.delete_sfp(sfp_vim_id
)
1556 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1557 task
["error_msg"] = None
1560 except vimconn
.VimConnException
as e
:
1561 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1562 if isinstance(e
, vimconn
.VimConnNotFoundException
):
1563 # If not found mark as Done and fill error_msg
1564 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1566 task
["status"] = "FAILED"
1569 def _refres_sfps(self
, task
):
1570 """Call VIM to get SFPs status"""
1571 database_update
= None
1573 vim_id
= task
["vim_id"]
1574 sfp_to_refresh_list
= [vim_id
]
1575 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1577 vim_dict
= self
.vim
.refresh_sfps_status(sfp_to_refresh_list
)
1578 vim_info
= vim_dict
[vim_id
]
1579 except vimconn
.VimConnException
as e
:
1580 # Mark all tasks at VIM_ERROR status
1581 self
.logger
.error("task={} get-sfp: vimconnException when trying to refresh sfps {}".format(task_id
, e
))
1582 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
1584 self
.logger
.debug("task={} get-sfp: vim_sfp_id={} result={}".format(task_id
, task
["vim_id"], vim_info
))
1585 #TODO: Revise this part
1586 vim_info_error_msg
= None
1587 if vim_info
.get("error_msg"):
1588 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info
["error_msg"])
1589 task_vim_info
= task
["extra"].get("vim_info")
1590 task_error_msg
= task
.get("error_msg")
1591 task_vim_status
= task
["extra"].get("vim_status")
1592 if task_vim_status
!= vim_info
["status"] or task_error_msg
!= vim_info_error_msg
or \
1593 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
1594 database_update
= {"status": vim_info
["status"], "error_msg": vim_info_error_msg
}
1595 if vim_info
.get("vim_info"):
1596 database_update
["vim_info"] = vim_info
["vim_info"]
1598 task
["extra"]["vim_status"] = vim_info
["status"]
1599 task
["error_msg"] = vim_info_error_msg
1600 if vim_info
.get("vim_info"):
1601 task
["extra"]["vim_info"] = vim_info
["vim_info"]
1603 return database_update
1605 def _refres_sfis(self
, task
):
1606 """Call VIM to get sfis status"""
1607 database_update
= None
1609 vim_id
= task
["vim_id"]
1610 sfi_to_refresh_list
= [vim_id
]
1611 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1613 vim_dict
= self
.vim
.refresh_sfis_status(sfi_to_refresh_list
)
1614 vim_info
= vim_dict
[vim_id
]
1615 except vimconn
.VimConnException
as e
:
1616 # Mark all tasks at VIM_ERROR status
1617 self
.logger
.error("task={} get-sfi: vimconnException when trying to refresh sfis {}".format(task_id
, e
))
1618 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
1620 self
.logger
.debug("task={} get-sfi: vim_sfi_id={} result={}".format(task_id
, task
["vim_id"], vim_info
))
1621 #TODO: Revise this part
1622 vim_info_error_msg
= None
1623 if vim_info
.get("error_msg"):
1624 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info
["error_msg"])
1625 task_vim_info
= task
["extra"].get("vim_info")
1626 task_error_msg
= task
.get("error_msg")
1627 task_vim_status
= task
["extra"].get("vim_status")
1628 if task_vim_status
!= vim_info
["status"] or task_error_msg
!= vim_info_error_msg
or \
1629 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
1630 database_update
= {"status": vim_info
["status"], "error_msg": vim_info_error_msg
}
1631 if vim_info
.get("vim_info"):
1632 database_update
["vim_info"] = vim_info
["vim_info"]
1634 task
["extra"]["vim_status"] = vim_info
["status"]
1635 task
["error_msg"] = vim_info_error_msg
1636 if vim_info
.get("vim_info"):
1637 task
["extra"]["vim_info"] = vim_info
["vim_info"]
1639 return database_update
1641 def _refres_sfs(self
, task
):
1642 """Call VIM to get sfs status"""
1643 database_update
= None
1645 vim_id
= task
["vim_id"]
1646 sf_to_refresh_list
= [vim_id
]
1647 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1649 vim_dict
= self
.vim
.refresh_sfs_status(sf_to_refresh_list
)
1650 vim_info
= vim_dict
[vim_id
]
1651 except vimconn
.VimConnException
as e
:
1652 # Mark all tasks at VIM_ERROR status
1653 self
.logger
.error("task={} get-sf: vimconnException when trying to refresh sfs {}".format(task_id
, e
))
1654 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
1656 self
.logger
.debug("task={} get-sf: vim_sf_id={} result={}".format(task_id
, task
["vim_id"], vim_info
))
1657 #TODO: Revise this part
1658 vim_info_error_msg
= None
1659 if vim_info
.get("error_msg"):
1660 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info
["error_msg"])
1661 task_vim_info
= task
["extra"].get("vim_info")
1662 task_error_msg
= task
.get("error_msg")
1663 task_vim_status
= task
["extra"].get("vim_status")
1664 if task_vim_status
!= vim_info
["status"] or task_error_msg
!= vim_info_error_msg
or \
1665 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
1666 database_update
= {"status": vim_info
["status"], "error_msg": vim_info_error_msg
}
1667 if vim_info
.get("vim_info"):
1668 database_update
["vim_info"] = vim_info
["vim_info"]
1670 task
["extra"]["vim_status"] = vim_info
["status"]
1671 task
["error_msg"] = vim_info_error_msg
1672 if vim_info
.get("vim_info"):
1673 task
["extra"]["vim_info"] = vim_info
["vim_info"]
1675 return database_update
1677 def _refres_classifications(self
, task
):
1678 """Call VIM to get classifications status"""
1679 database_update
= None
1681 vim_id
= task
["vim_id"]
1682 classification_to_refresh_list
= [vim_id
]
1683 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1685 vim_dict
= self
.vim
.refresh_classifications_status(classification_to_refresh_list
)
1686 vim_info
= vim_dict
[vim_id
]
1687 except vimconn
.VimConnException
as e
:
1688 # Mark all tasks at VIM_ERROR status
1689 self
.logger
.error("task={} get-classification: vimconnException when trying to refresh classifications {}"
1690 .format(task_id
, e
))
1691 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
1693 self
.logger
.debug("task={} get-classification: vim_classification_id={} result={}".format(task_id
,
1694 task
["vim_id"], vim_info
))
1695 #TODO: Revise this part
1696 vim_info_error_msg
= None
1697 if vim_info
.get("error_msg"):
1698 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info
["error_msg"])
1699 task_vim_info
= task
["extra"].get("vim_info")
1700 task_error_msg
= task
.get("error_msg")
1701 task_vim_status
= task
["extra"].get("vim_status")
1702 if task_vim_status
!= vim_info
["status"] or task_error_msg
!= vim_info_error_msg
or \
1703 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
1704 database_update
= {"status": vim_info
["status"], "error_msg": vim_info_error_msg
}
1705 if vim_info
.get("vim_info"):
1706 database_update
["vim_info"] = vim_info
["vim_info"]
1708 task
["extra"]["vim_status"] = vim_info
["status"]
1709 task
["error_msg"] = vim_info_error_msg
1710 if vim_info
.get("vim_info"):
1711 task
["extra"]["vim_info"] = vim_info
["vim_info"]
1713 return database_update