Merge branch 'py3' features 8029 8030
[osm/RO.git] / RO / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 Several vim_wim_actions can refer to the same element at VIM (flavor, network, ...). This is somethng to avoid if RO
28 is migrated to a non-relational database as mongo db. Each vim_wim_actions reference a different instance_Xxxxx
29 In this case "related" colunm contains the same value, to know they refer to the same vim. In case of deletion, it
30 there is related tasks using this element, it is not deleted, The vim_info needed to delete is transfered to other task
31
32 The task content is (M: stored at memory, D: stored at database):
33 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
34 MD task_index: index number of the task. This together with the previous forms a unique key identifier
35 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
36 MD vim_id: id of the vm,net,etc at VIM
37 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
38 MD item_id: uuid of the referenced entry in the previous table
39 MD action: CREATE, DELETE, FIND
40 MD status: SCHEDULED: action need to be done
41 BUILD: not used
42 DONE: Done and it must be polled to VIM periodically to see status. ONLY for action=CREATE or FIND
43 FAILED: It cannot be created/found/deleted
44 FINISHED: similar to DONE, but no refresh is needed anymore. Task is maintained at database but
45 it is never processed by any thread
46 SUPERSEDED: similar to FINSISHED, but nothing has been done to completed the task.
47 MD extra: text with yaml format at database, dict at memory with:
48 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken
49 from other related tasks
50 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains
51 the FIND params
52 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends
53 on a net creation
54 can contain an int (single index on the same instance-action) or str (compete action ID)
55 sdn_net_id: used for net.
56 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
57 iface_id: uuid of intance_interfaces
58 sdn_port_id:
59 sdn_net_id:
60 vim_info
61 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
62 created: False if the VIM element is not created by other actions, and it should not be deleted
63 vim_status: VIM status of the element. Stored also at database in the instance_XXX
64 vim_info: Detailed information of a vm/net from the VIM. Stored at database in the instance_XXX but not at
65 vim_wim_actions
66 M depends: dict with task_index(from depends_on) to dependency task
67 M params: same as extra[params]
68 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
69 MD created_at: task creation time. The task of creation must be the oldest
70 MD modified_at: next time task need to be processed. For example, for a refresh, it contain next time refresh must
71 be done
72 MD related: All the tasks over the same VIM element have same "related". Note that other VIMs can contain the
73 same value of related, but this thread only process those task of one VIM. Also related can be the
74 same among several NS os isntance-scenarios
75 MD worker: Used to lock in case of several thread workers.
76
77 """
78
79 import threading
80 import time
81 import queue
82 import logging
83 from osm_ro import vimconn
84 from osm_ro.wim.sdnconn import SdnConnectorError
85 import yaml
86 from osm_ro.db_base import db_base_Exception
87 from http import HTTPStatus
88 from copy import deepcopy
89
90 __author__ = "Alfonso Tierno, Pablo Montes"
91 __date__ = "$28-Sep-2017 12:07:15$"
92
93
94 def is_task_id(task_id):
95 return task_id.startswith("TASK-")
96
97
98 class VimThreadException(Exception):
99 pass
100
101
102 class VimThreadExceptionNotFound(VimThreadException):
103 pass
104
105
106 class vim_thread(threading.Thread):
107 REFRESH_BUILD = 5 # 5 seconds
108 REFRESH_ACTIVE = 60 # 1 minute
109 REFRESH_ERROR = 600
110 REFRESH_DELETE = 3600 * 10
111
112 def __init__(self, task_lock, plugins, name=None, wim_account_id=None, datacenter_tenant_id=None, db=None):
113 """Init a thread.
114 Arguments:
115 'id' number of thead
116 'name' name of thread
117 'host','user': host ip or name to manage and user
118 'db', 'db_lock': database class and lock to use it in exclusion
119 """
120 threading.Thread.__init__(self)
121 self.plugins = plugins
122 self.plugin_name = "unknown"
123 self.vim = None
124 self.sdnconnector = None
125 self.sdnconn_config = None
126 self.error_status = None
127 self.wim_account_id = wim_account_id
128 self.datacenter_tenant_id = datacenter_tenant_id
129 self.port_mapping = None
130 if self.wim_account_id:
131 self.target_k = "wim_account_id"
132 self.target_v = self.wim_account_id
133 else:
134 self.target_k = "datacenter_vim_id"
135 self.target_v = self.datacenter_tenant_id
136 if not name:
137 self.name = wim_account_id or str(datacenter_tenant_id)
138 else:
139 self.name = name
140 self.vim_persistent_info = {}
141 self.my_id = self.name[:64]
142
143 self.logger = logging.getLogger('openmano.{}.{}'.format("vim" if self.datacenter_tenant_id else "sdn",
144 self.name))
145 self.db = db
146
147 self.task_lock = task_lock
148 self.task_queue = queue.Queue(2000)
149
150 def _proccess_sdn_exception(self, exc):
151 if isinstance(exc, SdnConnectorError):
152 raise
153 else:
154 self.logger.error("plugin={} throws a non SdnConnectorError exception {}".format(self.plugin_name, exc),
155 exc_info=True)
156 raise SdnConnectorError(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc
157
158 def _proccess_vim_exception(self, exc):
159 if isinstance(exc, vimconn.vimconnException):
160 raise
161 else:
162 self.logger.error("plugin={} throws a non vimconnException exception {}".format(self.plugin_name, exc),
163 exc_info=True)
164 raise vimconn.vimconnException(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc
165
166 def get_vim_sdn_connector(self):
167 if self.datacenter_tenant_id:
168 try:
169 from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
170 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
171 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
172 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
173 'user', 'passwd', 'dt.config as dt_config')
174 where_ = {"dt.uuid": self.datacenter_tenant_id}
175 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
176 vim = vims[0]
177 vim_config = {}
178 if vim["config"]:
179 vim_config.update(yaml.load(vim["config"], Loader=yaml.Loader))
180 if vim["dt_config"]:
181 vim_config.update(yaml.load(vim["dt_config"], Loader=yaml.Loader))
182 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
183 vim_config['datacenter_id'] = vim.get('datacenter_id')
184
185 # get port_mapping
186 # vim_port_mappings = self.ovim.get_of_port_mappings(
187 # db_filter={"datacenter_id": vim_config['datacenter_id']})
188 # vim_config["wim_external_ports"] = [x for x in vim_port_mappings
189 # if x["service_mapping_info"].get("wim")]
190 self.plugin_name = "rovim_" + vim["type"]
191 self.vim = self.plugins[self.plugin_name].vimconnector(
192 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
193 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
194 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
195 user=vim['user'], passwd=vim['passwd'],
196 config=vim_config, persistent_info=self.vim_persistent_info
197 )
198 self.error_status = None
199 self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format(
200 self.datacenter_tenant_id, self.plugin_name))
201 except Exception as e:
202 self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {}".format(
203 self.datacenter_tenant_id, self.plugin_name, e))
204 self.vim = None
205 self.error_status = "Error loading vimconnector: {}".format(e)
206 else:
207 try:
208 wim_account = self.db.get_rows(FROM="wim_accounts", WHERE={"uuid": self.wim_account_id})[0]
209 wim = self.db.get_rows(FROM="wims", WHERE={"uuid": wim_account["wim_id"]})[0]
210 if wim["config"]:
211 self.sdnconn_config = yaml.load(wim["config"], Loader=yaml.Loader)
212 else:
213 self.sdnconn_config = {}
214 if wim_account["config"]:
215 self.sdnconn_config.update(yaml.load(wim_account["config"], Loader=yaml.Loader))
216 self.port_mappings = self.db.get_rows(FROM="wim_port_mappings", WHERE={"wim_id": wim_account["wim_id"]})
217 if self.port_mappings:
218 self.sdnconn_config["service_endpoint_mapping"] = self.port_mappings
219 self.plugin_name = "rosdn_" + wim["type"]
220 self.sdnconnector = self.plugins[self.plugin_name](
221 wim, wim_account, config=self.sdnconn_config)
222 self.error_status = None
223 self.logger.info("Sdn Connector loaded for wim_account={}, plugin={}".format(
224 self.wim_account_id, self.plugin_name))
225 except Exception as e:
226 self.logger.error("Cannot load sdn connector for wim_account={}, plugin={}: {}".format(
227 self.wim_account_id, self.plugin_name, e))
228 self.sdnconnector = None
229 self.error_status = "Error loading sdn connector: {}".format(e)
230
231 def _get_db_task(self):
232 """
233 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
234 :return: None
235 """
236 now = time.time()
237 try:
238 database_limit = 20
239 task_related = None
240 while True:
241 # get 20 (database_limit) entries each time
242 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
243 WHERE={self.target_k: self.target_v,
244 "status": ['SCHEDULED', 'BUILD', 'DONE'],
245 "worker": [None, self.my_id], "modified_at<=": now
246 },
247 ORDER_BY=("modified_at", "created_at",),
248 LIMIT=database_limit)
249 if not vim_actions:
250 return None, None
251 # if vim_actions[0]["modified_at"] > now:
252 # return int(vim_actions[0] - now)
253 for task in vim_actions:
254 # block related task
255 if task_related == task["related"]:
256 continue # ignore if a locking has already tried for these task set
257 task_related = task["related"]
258 # lock ...
259 self.db.update_rows("vim_wim_actions", UPDATE={"worker": self.my_id}, modified_time=0,
260 WHERE={self.target_k: self.target_v,
261 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
262 "worker": [None, self.my_id],
263 "related": task_related,
264 "item": task["item"],
265 })
266 # ... and read all related and check if locked
267 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
268 WHERE={self.target_k: self.target_v,
269 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
270 "related": task_related,
271 "item": task["item"],
272 },
273 ORDER_BY=("created_at",))
274 # check that all related tasks have been locked. If not release and try again. It can happen
275 # for race conditions if a new related task has been inserted by nfvo in the process
276 some_tasks_locked = False
277 some_tasks_not_locked = False
278 creation_task = None
279 for relate_task in related_tasks:
280 if relate_task["worker"] != self.my_id:
281 some_tasks_not_locked = True
282 else:
283 some_tasks_locked = True
284 if not creation_task and relate_task["action"] in ("CREATE", "FIND"):
285 creation_task = relate_task
286 if some_tasks_not_locked:
287 if some_tasks_locked: # unlock
288 self.db.update_rows("vim_wim_actions", UPDATE={"worker": None}, modified_time=0,
289 WHERE={self.target_k: self.target_v,
290 "worker": self.my_id,
291 "related": task_related,
292 "item": task["item"],
293 })
294 continue
295
296 # task of creation must be the first in the list of related_task
297 assert(related_tasks[0]["action"] in ("CREATE", "FIND"))
298
299 task["params"] = None
300 if task["extra"]:
301 extra = yaml.load(task["extra"], Loader=yaml.Loader)
302 else:
303 extra = {}
304 task["extra"] = extra
305 if extra.get("depends_on"):
306 task["depends"] = {}
307 if extra.get("params"):
308 task["params"] = deepcopy(extra["params"])
309 return task, related_tasks
310 except Exception as e:
311 self.logger.critical("Unexpected exception at _get_db_task: " + str(e), exc_info=True)
312 return None, None
313
314 def _delete_task(self, task):
315 """
316 Determine if this task need to be done or superseded
317 :return: None
318 """
319
320 def copy_extra_created(copy_to, copy_from):
321 copy_to["created"] = copy_from["created"]
322 if copy_from.get("sdn_net_id"):
323 copy_to["sdn_net_id"] = copy_from["sdn_net_id"]
324 if copy_from.get("interfaces"):
325 copy_to["interfaces"] = copy_from["interfaces"]
326 if copy_from.get("created_items"):
327 if not copy_to.get("created_items"):
328 copy_to["created_items"] = {}
329 copy_to["created_items"].update(copy_from["created_items"])
330
331 task_create = None
332 dependency_task = None
333 deletion_needed = False
334 if task["status"] == "FAILED":
335 return # TODO need to be retry??
336 try:
337 # get all related tasks
338 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
339 WHERE={self.target_k: self.target_v,
340 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
341 "action": ["FIND", "CREATE"],
342 "related": task["related"],
343 },
344 ORDER_BY=("created_at",),
345 )
346 for related_task in related_tasks:
347 if related_task["item"] == task["item"] and related_task["item_id"] == task["item_id"]:
348 task_create = related_task
349 # TASK_CREATE
350 if related_task["extra"]:
351 extra_created = yaml.load(related_task["extra"], Loader=yaml.Loader)
352 if extra_created.get("created"):
353 deletion_needed = True
354 related_task["extra"] = extra_created
355 elif not dependency_task:
356 dependency_task = related_task
357 if task_create and dependency_task:
358 break
359
360 # mark task_create as FINISHED
361 self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"},
362 WHERE={self.target_k: self.target_v,
363 "instance_action_id": task_create["instance_action_id"],
364 "task_index": task_create["task_index"]
365 })
366 if not deletion_needed:
367 return
368 elif dependency_task:
369 # move create information from task_create to relate_task
370 extra_new_created = yaml.load(dependency_task["extra"], Loader=yaml.Loader) or {}
371 extra_new_created["created"] = extra_created["created"]
372 copy_extra_created(copy_to=extra_new_created, copy_from=extra_created)
373
374 self.db.update_rows("vim_wim_actions",
375 UPDATE={"extra": yaml.safe_dump(extra_new_created, default_flow_style=True,
376 width=256),
377 "vim_id": task_create.get("vim_id")},
378 WHERE={self.target_k: self.target_v,
379 "instance_action_id": dependency_task["instance_action_id"],
380 "task_index": dependency_task["task_index"]
381 })
382 return False
383 else:
384 task["vim_id"] = task_create["vim_id"]
385 copy_extra_created(copy_to=task["extra"], copy_from=task_create["extra"])
386 return True
387
388 except Exception as e:
389 self.logger.critical("Unexpected exception at _delete_task: " + str(e), exc_info=True)
390
391 def _refres_vm(self, task):
392 """Call VIM to get VMs status"""
393 database_update = None
394
395 vim_id = task["vim_id"]
396 vm_to_refresh_list = [vim_id]
397 try:
398 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
399 vim_info = vim_dict[vim_id]
400 except vimconn.vimconnException as e:
401 # Mark all tasks at VIM_ERROR status
402 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
403 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
404
405 task_id = task["instance_action_id"] + "." + str(task["task_index"])
406 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
407
408 # check and update interfaces
409 task_warning_msg = ""
410 for interface in vim_info.get("interfaces", ()):
411 vim_interface_id = interface["vim_interface_id"]
412 if vim_interface_id not in task["extra"]["interfaces"]:
413 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
414 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
415 continue
416 task_interface = task["extra"]["interfaces"][vim_interface_id]
417 task_vim_interface = task_interface.get("vim_info")
418 if task_vim_interface != interface:
419 # delete old port
420 # if task_interface.get("sdn_port_id"):
421 # try:
422 # self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
423 # task_interface["sdn_port_id"] = None
424 # except ovimException as e:
425 # error_text = "ovimException deleting external_port={}: {}".format(
426 # task_interface["sdn_port_id"], e)
427 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
428 # task_warning_msg += error_text
429 # # TODO Set error_msg at instance_nets instead of instance VMs
430
431 # Create SDN port
432 # sdn_net_id = task_interface.get("sdn_net_id")
433 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
434 # sdn_port_name = sdn_net_id + "." + task["vim_id"]
435 # sdn_port_name = sdn_port_name[:63]
436 # try:
437 # sdn_port_id = self.ovim.new_external_port(
438 # {"compute_node": interface["compute_node"],
439 # "pci": interface["pci"],
440 # "vlan": interface.get("vlan"),
441 # "net_id": sdn_net_id,
442 # "region": self.vim["config"]["datacenter_id"],
443 # "name": sdn_port_name,
444 # "mac": interface.get("mac_address")})
445 # task_interface["sdn_port_id"] = sdn_port_id
446 # except (ovimException, Exception) as e:
447 # error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
448 # format(interface["compute_node"], interface["pci"], interface.get("vlan"), e)
449 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
450 # task_warning_msg += error_text
451 # # TODO Set error_msg at instance_nets instead of instance VMs
452
453 self.db.update_rows('instance_interfaces',
454 UPDATE={"mac_address": interface.get("mac_address"),
455 "ip_address": interface.get("ip_address"),
456 "vim_interface_id": interface.get("vim_interface_id"),
457 "vim_info": interface.get("vim_info"),
458 "sdn_port_id": task_interface.get("sdn_port_id"),
459 "compute_node": interface.get("compute_node"),
460 "pci": interface.get("pci"),
461 "vlan": interface.get("vlan")},
462 WHERE={'uuid': task_interface["iface_id"]})
463 task_interface["vim_info"] = interface
464 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
465 # # TODO Send message to task SDN to update
466
467 # check and update task and instance_vms database
468 vim_info_error_msg = None
469 if vim_info.get("error_msg"):
470 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
471 elif task_warning_msg:
472 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
473 task_vim_info = task["extra"].get("vim_info")
474 task_error_msg = task.get("error_msg")
475 task_vim_status = task["extra"].get("vim_status")
476 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
477 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
478 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
479 if vim_info.get("vim_info"):
480 database_update["vim_info"] = vim_info["vim_info"]
481
482 task["extra"]["vim_status"] = vim_info["status"]
483 task["error_msg"] = vim_info_error_msg
484 if vim_info.get("vim_info"):
485 task["extra"]["vim_info"] = vim_info["vim_info"]
486
487 return database_update
488
489 def _refres_net(self, task):
490 """Call VIM to get network status"""
491 database_update = None
492
493 vim_id = task["vim_id"]
494 net_to_refresh_list = [vim_id]
495 try:
496 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
497 vim_info = vim_dict[vim_id]
498 except vimconn.vimconnException as e:
499 # Mark all tasks at VIM_ERROR status
500 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
501 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
502
503 task_id = task["instance_action_id"] + "." + str(task["task_index"])
504 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
505
506 task_vim_info = task["extra"].get("vim_info")
507 task_vim_status = task["extra"].get("vim_status")
508 task_error_msg = task.get("error_msg")
509 # task_sdn_net_id = task["extra"].get("sdn_net_id")
510
511 vim_info_status = vim_info["status"]
512 vim_info_error_msg = vim_info.get("error_msg")
513 # get ovim status
514 # if task_sdn_net_id:
515 # try:
516 # sdn_net = self.ovim.show_network(task_sdn_net_id)
517 # except (ovimException, Exception) as e:
518 # text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
519 # self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
520 # sdn_net = {"status": "ERROR", "last_error": text_error}
521 # if sdn_net["status"] == "ERROR":
522 # if not vim_info_error_msg:
523 # vim_info_error_msg = str(sdn_net.get("last_error"))
524 # else:
525 # vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
526 # self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
527 # self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
528 # vim_info_status = "ERROR"
529 # elif sdn_net["status"] == "BUILD":
530 # if vim_info_status == "ACTIVE":
531 # vim_info_status = "BUILD"
532
533 # update database
534 if vim_info_error_msg:
535 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
536 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
537 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
538 task["extra"]["vim_status"] = vim_info_status
539 task["error_msg"] = vim_info_error_msg
540 if vim_info.get("vim_info"):
541 task["extra"]["vim_info"] = vim_info["vim_info"]
542 database_update = {"status": vim_info_status, "error_msg": vim_info_error_msg}
543 if vim_info.get("vim_info"):
544 database_update["vim_info"] = vim_info["vim_info"]
545 return database_update
546
547 def _proccess_pending_tasks(self, task, related_tasks):
548 old_task_status = task["status"]
549 create_or_find = False # if as result of processing this task something is created or found
550 next_refresh = 0
551
552 try:
553 if task["status"] == "SCHEDULED":
554 # check if tasks that this depends on have been completed
555 dependency_not_completed = False
556 dependency_modified_at = 0
557 for task_index in task["extra"].get("depends_on", ()):
558 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
559 if not task_dependency:
560 raise VimThreadException(
561 "Cannot get depending net task trying to get depending task {}.{}".format(
562 task["instance_action_id"], task_index))
563 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
564 # database must be look again
565 if task_dependency["status"] == "SCHEDULED":
566 dependency_not_completed = True
567 dependency_modified_at = task_dependency["modified_at"]
568 break
569 elif task_dependency["status"] == "FAILED":
570 raise VimThreadException(
571 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
572 task["action"], task["item"],
573 task["instance_action_id"], task["task_index"],
574 task_dependency["instance_action_id"], task_dependency["task_index"],
575 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
576
577 task["depends"]["TASK-"+str(task_index)] = task_dependency
578 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], task_index)] = task_dependency
579 if dependency_not_completed:
580 # Move this task to the time dependency is going to be modified plus 10 seconds.
581 self.db.update_rows("vim_wim_actions", modified_time=dependency_modified_at + 10,
582 UPDATE={"worker": None},
583 WHERE={self.target_k: self.target_v, "worker": self.my_id,
584 "related": task["related"],
585 })
586 # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
587 # if task["extra"]["tries"] > 3:
588 # raise VimThreadException(
589 # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
590 # "(task {}.{})".format(task["action"], task["item"],
591 # task["instance_action_id"], task["task_index"],
592 # task_dependency["instance_action_id"], task_dependency["task_index"]
593 # task_dependency["action"], task_dependency["item"]))
594 return
595
596 database_update = None
597 if task["action"] == "DELETE":
598 deleted_needed = self._delete_task(task)
599 if not deleted_needed:
600 task["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
601 task["error_msg"] = None
602
603 if task["status"] == "SUPERSEDED":
604 # not needed to do anything but update database with the new status
605 database_update = None
606 elif not self.vim and not self.sdnconnector:
607 task["status"] = "FAILED"
608 task["error_msg"] = self.error_status
609 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
610 elif task["item_id"] != related_tasks[0]["item_id"] and task["action"] in ("FIND", "CREATE"):
611 # Do nothing, just copy values from one to another and updata database
612 task["status"] = related_tasks[0]["status"]
613 task["error_msg"] = related_tasks[0]["error_msg"]
614 task["vim_id"] = related_tasks[0]["vim_id"]
615 extra = yaml.load(related_tasks[0]["extra"], Loader=yaml.Loader)
616 task["extra"]["vim_status"] = extra.get("vim_status")
617 next_refresh = related_tasks[0]["modified_at"] + 0.001
618 database_update = {"status": task["extra"].get("vim_status", "VIM_ERROR"),
619 "error_msg": task["error_msg"]}
620 if task["item"] == 'instance_vms':
621 database_update["vim_vm_id"] = task["vim_id"]
622 elif task["item"] == 'instance_nets':
623 database_update["vim_net_id"] = task["vim_id"]
624 elif task["item"] == 'instance_vms':
625 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
626 database_update = self._refres_vm(task)
627 create_or_find = True
628 elif task["action"] == "CREATE":
629 create_or_find = True
630 database_update = self.new_vm(task)
631 elif task["action"] == "DELETE":
632 self.del_vm(task)
633 else:
634 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
635 elif task["item"] == 'instance_nets':
636 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
637 database_update = self._refres_net(task)
638 create_or_find = True
639 elif task["action"] == "CREATE":
640 create_or_find = True
641 database_update = self.new_net(task)
642 elif task["action"] == "DELETE":
643 self.del_net(task)
644 elif task["action"] == "FIND":
645 database_update = self.get_net(task)
646 else:
647 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
648 elif task["item"] == 'instance_wim_nets':
649 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
650 database_update = self.new_or_update_sdn_net(task)
651 create_or_find = True
652 elif task["action"] == "CREATE":
653 create_or_find = True
654 database_update = self.new_or_update_sdn_net(task)
655 elif task["action"] == "DELETE":
656 self.del_sdn_net(task)
657 elif task["action"] == "FIND":
658 database_update = self.get_sdn_net(task)
659 else:
660 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
661 elif task["item"] == 'instance_sfis':
662 if task["action"] == "CREATE":
663 create_or_find = True
664 database_update = self.new_sfi(task)
665 elif task["action"] == "DELETE":
666 self.del_sfi(task)
667 else:
668 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
669 elif task["item"] == 'instance_sfs':
670 if task["action"] == "CREATE":
671 create_or_find = True
672 database_update = self.new_sf(task)
673 elif task["action"] == "DELETE":
674 self.del_sf(task)
675 else:
676 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
677 elif task["item"] == 'instance_classifications':
678 if task["action"] == "CREATE":
679 create_or_find = True
680 database_update = self.new_classification(task)
681 elif task["action"] == "DELETE":
682 self.del_classification(task)
683 else:
684 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
685 elif task["item"] == 'instance_sfps':
686 if task["action"] == "CREATE":
687 create_or_find = True
688 database_update = self.new_sfp(task)
689 elif task["action"] == "DELETE":
690 self.del_sfp(task)
691 else:
692 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
693 else:
694 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
695 # TODO
696 except VimThreadException as e:
697 task["error_msg"] = str(e)
698 task["status"] = "FAILED"
699 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
700 if task["item"] == 'instance_vms':
701 database_update["vim_vm_id"] = None
702 elif task["item"] == 'instance_nets':
703 database_update["vim_net_id"] = None
704
705 task_id = task["instance_action_id"] + "." + str(task["task_index"])
706 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
707 task_id, task["item"], task["action"], task["status"],
708 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
709 try:
710 if not next_refresh:
711 if task["status"] == "DONE":
712 next_refresh = time.time()
713 if task["extra"].get("vim_status") == "BUILD":
714 next_refresh += self.REFRESH_BUILD
715 elif task["extra"].get("vim_status") in ("ERROR", "VIM_ERROR"):
716 next_refresh += self.REFRESH_ERROR
717 elif task["extra"].get("vim_status") == "DELETED":
718 next_refresh += self.REFRESH_DELETE
719 else:
720 next_refresh += self.REFRESH_ACTIVE
721 elif task["status"] == "FAILED":
722 next_refresh = time.time() + self.REFRESH_DELETE
723
724 if create_or_find:
725 # modify all related task with action FIND/CREATED non SCHEDULED
726 self.db.update_rows(
727 table="vim_wim_actions", modified_time=next_refresh + 0.001,
728 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
729 "error_msg": task["error_msg"],
730 },
731
732 WHERE={self.target_k: self.target_v,
733 "worker": self.my_id,
734 "action": ["FIND", "CREATE"],
735 "related": task["related"],
736 "status<>": "SCHEDULED",
737 })
738 # modify own task
739 self.db.update_rows(
740 table="vim_wim_actions", modified_time=next_refresh,
741 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
742 "error_msg": task["error_msg"],
743 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
744 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
745 # Unlock tasks
746 self.db.update_rows(
747 table="vim_wim_actions", modified_time=0,
748 UPDATE={"worker": None},
749 WHERE={self.target_k: self.target_v,
750 "worker": self.my_id,
751 "related": task["related"],
752 })
753
754 # Update table instance_actions
755 if old_task_status == "SCHEDULED" and task["status"] != old_task_status:
756 self.db.update_rows(
757 table="instance_actions",
758 UPDATE={("number_failed" if task["status"] == "FAILED" else "number_done"): {"INCREMENT": 1}},
759 WHERE={"uuid": task["instance_action_id"]})
760 if database_update:
761 where_filter = {"related": task["related"]}
762 if task["item"] == "instance_nets" and task["datacenter_vim_id"]:
763 where_filter["datacenter_tenant_id"] = task["datacenter_vim_id"]
764 self.db.update_rows(table=task["item"],
765 UPDATE=database_update,
766 WHERE=where_filter)
767 except db_base_Exception as e:
768 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
769
770 def insert_task(self, task):
771 try:
772 self.task_queue.put(task, False)
773 return None
774 except queue.Full:
775 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
776
777 def del_task(self, task):
778 with self.task_lock:
779 if task["status"] == "SCHEDULED":
780 task["status"] = "SUPERSEDED"
781 return True
782 else: # task["status"] == "processing"
783 self.task_lock.release()
784 return False
785
786 def run(self):
787 self.logger.debug("Starting")
788 while True:
789 self.get_vim_sdn_connector()
790 self.logger.debug("Vimconnector loaded")
791 reload_thread = False
792
793 while True:
794 try:
795 while not self.task_queue.empty():
796 task = self.task_queue.get()
797 if isinstance(task, list):
798 pass
799 elif isinstance(task, str):
800 if task == 'exit':
801 return 0
802 elif task == 'reload':
803 reload_thread = True
804 break
805 self.task_queue.task_done()
806 if reload_thread:
807 break
808
809 task, related_tasks = self._get_db_task()
810 if task:
811 self._proccess_pending_tasks(task, related_tasks)
812 else:
813 time.sleep(5)
814
815 except Exception as e:
816 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
817
818 self.logger.debug("Finishing")
819
820 def _look_for_task(self, instance_action_id, task_id):
821 """
822 Look for a concrete task at vim_actions database table
823 :param instance_action_id: The instance_action_id
824 :param task_id: Can have several formats:
825 <task index>: integer
826 TASK-<task index> :backward compatibility,
827 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
828 :return: Task dictionary or None if not found
829 """
830 if isinstance(task_id, int):
831 task_index = task_id
832 else:
833 if task_id.startswith("TASK-"):
834 task_id = task_id[5:]
835 ins_action_id, _, task_index = task_id.rpartition(".")
836 if ins_action_id:
837 instance_action_id = ins_action_id
838
839 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
840 "task_index": task_index})
841 if not tasks:
842 return None
843 task = tasks[0]
844 task["params"] = None
845 task["depends"] = {}
846 if task["extra"]:
847 extra = yaml.load(task["extra"], Loader=yaml.Loader)
848 task["extra"] = extra
849 task["params"] = extra.get("params")
850 else:
851 task["extra"] = {}
852 return task
853
854 @staticmethod
855 def _format_vim_error_msg(error_text, max_length=1024):
856 if error_text and len(error_text) >= max_length:
857 return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:]
858 return error_text
859
860 def new_vm(self, task):
861 task_id = task["instance_action_id"] + "." + str(task["task_index"])
862 try:
863 params = task["params"]
864 depends = task.get("depends")
865 net_list = params[5]
866 for net in net_list:
867 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
868 network_id = task["depends"][net["net_id"]].get("vim_id")
869 if not network_id:
870 raise VimThreadException(
871 "Cannot create VM because depends on a network not created or found: " +
872 str(depends[net["net_id"]]["error_msg"]))
873 net["net_id"] = network_id
874 params_copy = deepcopy(params)
875 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
876
877 # fill task_interfaces. Look for snd_net_id at database for each interface
878 task_interfaces = {}
879 for iface in params_copy[5]:
880 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
881 result = self.db.get_rows(
882 SELECT=('sdn_net_id', 'interface_id'),
883 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
884 WHERE={'ii.uuid': iface["uuid"]})
885 if result:
886 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
887 task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
888 else:
889 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
890 iface["uuid"]),
891 exc_info=True)
892
893 task["vim_info"] = {}
894 task["extra"]["interfaces"] = task_interfaces
895 task["extra"]["created"] = True
896 task["extra"]["created_items"] = created_items
897 task["extra"]["vim_status"] = "BUILD"
898 task["error_msg"] = None
899 task["status"] = "DONE"
900 task["vim_id"] = vim_vm_id
901 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
902 return instance_element_update
903
904 except (vimconn.vimconnException, VimThreadException) as e:
905 self.logger.error("task={} new-VM: {}".format(task_id, e))
906 error_text = self._format_vim_error_msg(str(e))
907 task["error_msg"] = error_text
908 task["status"] = "FAILED"
909 task["vim_id"] = None
910 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
911 return instance_element_update
912
913 def del_vm(self, task):
914 # task_id = task["instance_action_id"] + "." + str(task["task_index"])
915 vm_vim_id = task["vim_id"]
916 # interfaces = task["extra"].get("interfaces", ())
917 try:
918 # for iface in interfaces.values():
919 # if iface.get("sdn_port_id"):
920 # try:
921 # self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
922 # except ovimException as e:
923 # self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
924 # task_id, iface["sdn_port_id"], e), exc_info=True)
925 # # TODO Set error_msg at instance_nets
926
927 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
928 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
929 task["error_msg"] = None
930 return None
931
932 except vimconn.vimconnException as e:
933 task["error_msg"] = self._format_vim_error_msg(str(e))
934 if isinstance(e, vimconn.vimconnNotFoundException):
935 # If not found mark as Done and fill error_msg
936 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
937 return None
938 task["status"] = "FAILED"
939 return None
940
941 def _get_net_internal(self, task, filter_param):
942 """
943 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
944 :param task: task for this find or find-or-create action
945 :param filter_param: parameters to send to the vimconnector
946 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
947 when network is not found or found more than one
948 """
949 vim_nets = self.vim.get_network_list(filter_param)
950 if not vim_nets:
951 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
952 elif len(vim_nets) > 1:
953 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
954 vim_net_id = vim_nets[0]["id"]
955
956 # Discover if this network is managed by a sdn controller
957 sdn_net_id = None
958 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
959 WHERE={'vim_net_id': vim_net_id, 'datacenter_tenant_id': self.datacenter_tenant_id},
960 ORDER="instance_scenario_id")
961 if result:
962 sdn_net_id = result[0]['sdn_net_id']
963
964 task["status"] = "DONE"
965 task["extra"]["vim_info"] = {}
966 task["extra"]["created"] = False
967 task["extra"]["vim_status"] = "BUILD"
968 task["extra"]["sdn_net_id"] = sdn_net_id
969 task["error_msg"] = None
970 task["vim_id"] = vim_net_id
971 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
972 "error_msg": None, "sdn_net_id": sdn_net_id}
973 return instance_element_update
974
975 def get_net(self, task):
976 task_id = task["instance_action_id"] + "." + str(task["task_index"])
977 try:
978 params = task["params"]
979 filter_param = params[0]
980 instance_element_update = self._get_net_internal(task, filter_param)
981 return instance_element_update
982
983 except (vimconn.vimconnException, VimThreadException) as e:
984 self.logger.error("task={} get-net: {}".format(task_id, e))
985 task["status"] = "FAILED"
986 task["vim_id"] = None
987 task["error_msg"] = self._format_vim_error_msg(str(e))
988 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
989 "error_msg": task["error_msg"]}
990 return instance_element_update
991
992 def new_net(self, task):
993 vim_net_id = None
994 task_id = task["instance_action_id"] + "." + str(task["task_index"])
995 action_text = ""
996 try:
997 # FIND
998 if task["extra"].get("find"):
999 action_text = "finding"
1000 filter_param = task["extra"]["find"][0]
1001 try:
1002 instance_element_update = self._get_net_internal(task, filter_param)
1003 return instance_element_update
1004 except VimThreadExceptionNotFound:
1005 pass
1006 # CREATE
1007 params = task["params"]
1008 action_text = "creating VIM"
1009
1010 vim_net_id, created_items = self.vim.new_network(*params[0:5])
1011
1012 # net_name = params[0]
1013 # net_type = params[1]
1014 # wim_account_name = None
1015 # if len(params) >= 6:
1016 # wim_account_name = params[5]
1017
1018 # TODO fix at nfvo adding external port
1019 # if wim_account_name and self.vim.config["wim_external_ports"]:
1020 # # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
1021 # action_text = "attaching external port to ovim network"
1022 # sdn_port_name = "external_port"
1023 # sdn_port_data = {
1024 # "compute_node": "__WIM:" + wim_account_name[0:58],
1025 # "pci": None,
1026 # "vlan": network["vlan"],
1027 # "net_id": sdn_net_id,
1028 # "region": self.vim["config"]["datacenter_id"],
1029 # "name": sdn_port_name,
1030 # }
1031 # try:
1032 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1033 # except ovimException:
1034 # sdn_port_data["compute_node"] = "__WIM"
1035 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1036 # self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
1037 # sdn_net_id))
1038 task["status"] = "DONE"
1039 task["extra"]["vim_info"] = {}
1040 # task["extra"]["sdn_net_id"] = sdn_net_id
1041 task["extra"]["vim_status"] = "BUILD"
1042 task["extra"]["created"] = True
1043 task["extra"]["created_items"] = created_items
1044 task["error_msg"] = None
1045 task["vim_id"] = vim_net_id
1046 instance_element_update = {"vim_net_id": vim_net_id, "status": "BUILD",
1047 "created": True, "error_msg": None}
1048 return instance_element_update
1049 except vimconn.vimconnException as e:
1050 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1051 task["status"] = "FAILED"
1052 task["vim_id"] = vim_net_id
1053 task["error_msg"] = self._format_vim_error_msg(str(e))
1054 # task["extra"]["sdn_net_id"] = sdn_net_id
1055 instance_element_update = {"vim_net_id": vim_net_id, "status": "VIM_ERROR",
1056 "error_msg": task["error_msg"]}
1057 return instance_element_update
1058
1059 def del_net(self, task):
1060 net_vim_id = task["vim_id"]
1061 # sdn_net_id = task["extra"].get("sdn_net_id")
1062 try:
1063 if net_vim_id:
1064 self.vim.delete_network(net_vim_id, task["extra"].get("created_items"))
1065 # if sdn_net_id:
1066 # # Delete any attached port to this sdn network. There can be ports associated to this network in case
1067 # # it was manually done using 'openmano vim-net-sdn-attach'
1068 # port_list = self.ovim.get_ports(columns={'uuid'},
1069 # filter={'name': 'external_port', 'net_id': sdn_net_id})
1070 # for port in port_list:
1071 # self.ovim.delete_port(port['uuid'], idempotent=True)
1072 # self.ovim.delete_network(sdn_net_id, idempotent=True)
1073 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1074 task["error_msg"] = None
1075 return None
1076 except vimconn.vimconnException as e:
1077 task["error_msg"] = self._format_vim_error_msg(str(e))
1078 if isinstance(e, vimconn.vimconnNotFoundException):
1079 # If not found mark as Done and fill error_msg
1080 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1081 return None
1082 task["status"] = "FAILED"
1083 return None
1084
1085 def new_or_update_sdn_net(self, task):
1086 wimconn_net_id = task["vim_id"]
1087 created_items = task["extra"].get("created_items")
1088 connected_ports = task["extra"].get("connected_ports", [])
1089 new_connected_ports = []
1090 last_update = task["extra"].get("last_update", 0)
1091 sdn_status = "BUILD"
1092 sdn_info = None
1093
1094 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1095 error_list = []
1096 try:
1097 # FIND
1098 if task["extra"].get("find"):
1099 wimconn_id = task["extra"]["find"][0]
1100 try:
1101 instance_element_update = self.sdnconnector.get_connectivity_service_status(wimconn_id)
1102 wimconn_net_id = wimconn_id
1103 instance_element_update = {"wim_internal_id": wimconn_net_id, "created": False, "status": "BUILD",
1104 "error_msg": None, }
1105 return instance_element_update
1106 except Exception as e:
1107 if isinstance(e, SdnConnectorError) and e.http_error == HTTPStatus.NOT_FOUND.value:
1108 pass
1109 else:
1110 self._proccess_sdn_exception(e)
1111
1112 params = task["params"]
1113 # CREATE
1114 # look for ports
1115 sdn_ports = []
1116 pending_ports = 0
1117
1118 ports = self.db.get_rows(FROM='instance_interfaces', WHERE={'instance_wim_net_id': task["item_id"]})
1119 sdn_need_update = False
1120 for port in ports:
1121 # TODO. Do not connect if already done
1122 if port.get("compute_node") and port.get("pci"):
1123 for map in self.port_mappings:
1124 if map.get("device_id") == port["compute_node"] and \
1125 map.get("device_interface_id") == port["pci"]:
1126 break
1127 else:
1128 if self.sdnconn_config.get("mapping_not_needed"):
1129 map = {
1130 "service_endpoint_id": "{}:{}".format(port["compute_node"], port["pci"]),
1131 "service_endpoint_encapsulation_info": {
1132 "vlan": port["vlan"],
1133 "mac": port["mac_address"],
1134 "device_id": port["compute_node"],
1135 "device_interface_id": port["pci"]
1136 }
1137 }
1138 else:
1139 map = None
1140 error_list.append("Port mapping not found for compute_node={} pci={}".format(
1141 port["compute_node"], port["pci"]))
1142
1143 if map:
1144 if port["uuid"] not in connected_ports or port["modified_at"] > last_update:
1145 sdn_need_update = True
1146 new_connected_ports.append(port["uuid"])
1147 sdn_ports.append({
1148 "service_endpoint_id": map["service_endpoint_id"],
1149 "service_endpoint_encapsulation_type": "dot1q" if port["model"] == "SR-IOV" else None,
1150 "service_endpoint_encapsulation_info": {
1151 "vlan": port["vlan"],
1152 "mac": port["mac_address"],
1153 "device_id": map.get("device_id"),
1154 "device_interface_id": map.get("device_interface_id"),
1155 "switch_dpid": map.get("switch_dpid"),
1156 "switch_port": map.get("switch_port"),
1157 "service_mapping_info": map.get("service_mapping_info"),
1158 }
1159 })
1160
1161 else:
1162 pending_ports += 1
1163 if pending_ports:
1164 error_list.append("Waiting for getting interfaces location from VIM. Obtained '{}' of {}"
1165 .format(len(ports)-pending_ports, len(ports)))
1166 # if there are more ports to connect or they have been modified, call create/update
1167 if sdn_need_update and len(sdn_ports) >= 2:
1168 if not wimconn_net_id:
1169 if params[0] == "data":
1170 net_type = "ELAN"
1171 elif params[0] == "ptp":
1172 net_type = "ELINE"
1173 else:
1174 net_type = "L3"
1175
1176 wimconn_net_id, created_items = self.sdnconnector.create_connectivity_service(net_type, sdn_ports)
1177 else:
1178 created_items = self.sdnconnector.edit_connectivity_service(wimconn_net_id, conn_info=created_items,
1179 connection_points=sdn_ports)
1180 last_update = time.time()
1181 connected_ports = new_connected_ports
1182 elif wimconn_net_id:
1183 try:
1184 wim_status_dict = self.sdnconnector.get_connectivity_service_status(wimconn_net_id,
1185 conn_info=created_items)
1186 sdn_status = wim_status_dict["sdn_status"]
1187 if wim_status_dict.get("error_msg"):
1188 error_list.append(wim_status_dict.get("error_msg"))
1189 if wim_status_dict.get("sdn_info"):
1190 sdn_info = str(wim_status_dict.get("sdn_info"))
1191 except Exception as e:
1192 self._proccess_sdn_exception(e)
1193
1194 task["status"] = "DONE"
1195 task["extra"]["vim_info"] = {}
1196 # task["extra"]["sdn_net_id"] = sdn_net_id
1197 task["extra"]["vim_status"] = "BUILD"
1198 task["extra"]["created"] = True
1199 task["extra"]["created_items"] = created_items
1200 task["extra"]["connected_ports"] = connected_ports
1201 task["extra"]["last_update"] = last_update
1202 task["error_msg"] = self._format_vim_error_msg(" ; ".join(error_list))
1203 task["vim_id"] = wimconn_net_id
1204 instance_element_update = {"wim_internal_id": wimconn_net_id, "status": sdn_status,
1205 "created": True, "error_msg": task["error_msg"] or None}
1206 except (vimconn.vimconnException, SdnConnectorError) as e:
1207 self.logger.error("task={} new-sdn-net: Error: {}".format(task_id, e))
1208 task["status"] = "FAILED"
1209 task["vim_id"] = wimconn_net_id
1210 task["error_msg"] = self._format_vim_error_msg(str(e))
1211 # task["extra"]["sdn_net_id"] = sdn_net_id
1212 instance_element_update = {"wim_internal_id": wimconn_net_id, "status": "WIM_ERROR",
1213 "error_msg": task["error_msg"]}
1214 if sdn_info:
1215 instance_element_update["wim_info"] = sdn_info
1216 return instance_element_update
1217
1218 def del_sdn_net(self, task):
1219 wimconn_net_id = task["vim_id"]
1220 try:
1221 try:
1222 if wimconn_net_id:
1223 self.sdnconnector.delete_connectivity_service(wimconn_net_id, task["extra"].get("created_items"))
1224 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1225 task["error_msg"] = None
1226 return None
1227 except Exception as e:
1228 self._proccess_sdn_exception(e)
1229 except SdnConnectorError as e:
1230 task["error_msg"] = self._format_vim_error_msg(str(e))
1231 if e.http_code == HTTPStatus.NOT_FOUND.value:
1232 # If not found mark as Done and fill error_msg
1233 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1234 task["error_msg"] = None
1235 return None
1236 task["status"] = "FAILED"
1237 return None
1238
1239 # Service Function Instances
1240 def new_sfi(self, task):
1241 vim_sfi_id = None
1242 try:
1243 # Waits for interfaces to be ready (avoids failure)
1244 time.sleep(1)
1245 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1246 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1247 error_text = ""
1248 interfaces = task["depends"][dep_id]["extra"].get("interfaces")
1249
1250 ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id")
1251 egress_interface_id = task.get("extra").get("params").get("egress_interface_id")
1252 ingress_vim_interface_id = None
1253 egress_vim_interface_id = None
1254 for vim_interface, interface_data in interfaces.items():
1255 if interface_data.get("interface_id") == ingress_interface_id:
1256 ingress_vim_interface_id = vim_interface
1257 break
1258 if ingress_interface_id != egress_interface_id:
1259 for vim_interface, interface_data in interfaces.items():
1260 if interface_data.get("interface_id") == egress_interface_id:
1261 egress_vim_interface_id = vim_interface
1262 break
1263 else:
1264 egress_vim_interface_id = ingress_vim_interface_id
1265 if not ingress_vim_interface_id or not egress_vim_interface_id:
1266 error_text = "Error creating Service Function Instance, Ingress: {}, Egress: {}".format(
1267 ingress_vim_interface_id, egress_vim_interface_id)
1268 self.logger.error(error_text)
1269 task["error_msg"] = error_text
1270 task["status"] = "FAILED"
1271 task["vim_id"] = None
1272 return None
1273 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1274 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack,
1275 # only the first ingress and first egress ports will be used to create the SFI (Port Pair).
1276 ingress_port_id_list = [ingress_vim_interface_id]
1277 egress_port_id_list = [egress_vim_interface_id]
1278 name = "sfi-{}".format(task["item_id"][:8])
1279 # By default no form of IETF SFC Encapsulation will be used
1280 vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False)
1281
1282 task["extra"]["created"] = True
1283 task["extra"]["vim_status"] = "ACTIVE"
1284 task["error_msg"] = None
1285 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1286 task["vim_id"] = vim_sfi_id
1287 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1288 return instance_element_update
1289
1290 except (vimconn.vimconnException, VimThreadException) as e:
1291 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1292 error_text = self._format_vim_error_msg(str(e))
1293 task["error_msg"] = error_text
1294 task["status"] = "FAILED"
1295 task["vim_id"] = None
1296 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1297 return instance_element_update
1298
1299 def del_sfi(self, task):
1300 sfi_vim_id = task["vim_id"]
1301 try:
1302 self.vim.delete_sfi(sfi_vim_id)
1303 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1304 task["error_msg"] = None
1305 return None
1306
1307 except vimconn.vimconnException as e:
1308 task["error_msg"] = self._format_vim_error_msg(str(e))
1309 if isinstance(e, vimconn.vimconnNotFoundException):
1310 # If not found mark as Done and fill error_msg
1311 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1312 return None
1313 task["status"] = "FAILED"
1314 return None
1315
1316 def new_sf(self, task):
1317 vim_sf_id = None
1318 try:
1319 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1320 error_text = ""
1321 depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]]
1322 # sfis = next(iter(task.get("depends").values())).get("extra").get("params")[5]
1323 sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks]
1324 sfi_id_list = []
1325 for sfi in sfis:
1326 sfi_id_list.append(sfi.get("vim_id"))
1327 name = "sf-{}".format(task["item_id"][:8])
1328 # By default no form of IETF SFC Encapsulation will be used
1329 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1330
1331 task["extra"]["created"] = True
1332 task["extra"]["vim_status"] = "ACTIVE"
1333 task["error_msg"] = None
1334 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1335 task["vim_id"] = vim_sf_id
1336 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1337 return instance_element_update
1338
1339 except (vimconn.vimconnException, VimThreadException) as e:
1340 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1341 error_text = self._format_vim_error_msg(str(e))
1342 task["error_msg"] = error_text
1343 task["status"] = "FAILED"
1344 task["vim_id"] = None
1345 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1346 return instance_element_update
1347
1348 def del_sf(self, task):
1349 sf_vim_id = task["vim_id"]
1350 try:
1351 self.vim.delete_sf(sf_vim_id)
1352 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1353 task["error_msg"] = None
1354 return None
1355
1356 except vimconn.vimconnException as e:
1357 task["error_msg"] = self._format_vim_error_msg(str(e))
1358 if isinstance(e, vimconn.vimconnNotFoundException):
1359 # If not found mark as Done and fill error_msg
1360 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1361 return None
1362 task["status"] = "FAILED"
1363 return None
1364
1365 def new_classification(self, task):
1366 vim_classification_id = None
1367 try:
1368 params = task["params"]
1369 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1370 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1371 error_text = ""
1372 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces").keys()
1373 # Bear in mind that different VIM connectors might support Classifications differently.
1374 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1375 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1376 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1377 # using the IPv4 flow classifier.
1378 name = "c-{}".format(task["item_id"][:8])
1379 # if not CIDR is given for the IP addresses, add /32:
1380 ip_proto = int(params.get("ip_proto"))
1381 source_ip = params.get("source_ip")
1382 destination_ip = params.get("destination_ip")
1383 source_port = params.get("source_port")
1384 destination_port = params.get("destination_port")
1385 definition = {"logical_source_port": interfaces[0]}
1386 if ip_proto:
1387 if ip_proto == 1:
1388 ip_proto = 'icmp'
1389 elif ip_proto == 6:
1390 ip_proto = 'tcp'
1391 elif ip_proto == 17:
1392 ip_proto = 'udp'
1393 definition["protocol"] = ip_proto
1394 if source_ip:
1395 if '/' not in source_ip:
1396 source_ip += '/32'
1397 definition["source_ip_prefix"] = source_ip
1398 if source_port:
1399 definition["source_port_range_min"] = source_port
1400 definition["source_port_range_max"] = source_port
1401 if destination_port:
1402 definition["destination_port_range_min"] = destination_port
1403 definition["destination_port_range_max"] = destination_port
1404 if destination_ip:
1405 if '/' not in destination_ip:
1406 destination_ip += '/32'
1407 definition["destination_ip_prefix"] = destination_ip
1408
1409 vim_classification_id = self.vim.new_classification(
1410 name, 'legacy_flow_classifier', definition)
1411
1412 task["extra"]["created"] = True
1413 task["extra"]["vim_status"] = "ACTIVE"
1414 task["error_msg"] = None
1415 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1416 task["vim_id"] = vim_classification_id
1417 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id,
1418 "error_msg": None}
1419 return instance_element_update
1420
1421 except (vimconn.vimconnException, VimThreadException) as e:
1422 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1423 error_text = self._format_vim_error_msg(str(e))
1424 task["error_msg"] = error_text
1425 task["status"] = "FAILED"
1426 task["vim_id"] = None
1427 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1428 return instance_element_update
1429
1430 def del_classification(self, task):
1431 classification_vim_id = task["vim_id"]
1432 try:
1433 self.vim.delete_classification(classification_vim_id)
1434 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1435 task["error_msg"] = None
1436 return None
1437
1438 except vimconn.vimconnException as e:
1439 task["error_msg"] = self._format_vim_error_msg(str(e))
1440 if isinstance(e, vimconn.vimconnNotFoundException):
1441 # If not found mark as Done and fill error_msg
1442 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1443 return None
1444 task["status"] = "FAILED"
1445 return None
1446
1447 def new_sfp(self, task):
1448 vim_sfp_id = None
1449 try:
1450 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1451 depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in
1452 task.get("extra").get("depends_on")]
1453 error_text = ""
1454 sf_id_list = []
1455 classification_id_list = []
1456 for dep in depending_tasks:
1457 vim_id = dep.get("vim_id")
1458 resource = dep.get("item")
1459 if resource == "instance_sfs":
1460 sf_id_list.append(vim_id)
1461 elif resource == "instance_classifications":
1462 classification_id_list.append(vim_id)
1463
1464 name = "sfp-{}".format(task["item_id"][:8])
1465 # By default no form of IETF SFC Encapsulation will be used
1466 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1467
1468 task["extra"]["created"] = True
1469 task["extra"]["vim_status"] = "ACTIVE"
1470 task["error_msg"] = None
1471 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1472 task["vim_id"] = vim_sfp_id
1473 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1474 return instance_element_update
1475
1476 except (vimconn.vimconnException, VimThreadException) as e:
1477 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1478 error_text = self._format_vim_error_msg(str(e))
1479 task["error_msg"] = error_text
1480 task["status"] = "FAILED"
1481 task["vim_id"] = None
1482 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1483 return instance_element_update
1484
1485 def del_sfp(self, task):
1486 sfp_vim_id = task["vim_id"]
1487 try:
1488 self.vim.delete_sfp(sfp_vim_id)
1489 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1490 task["error_msg"] = None
1491 return None
1492
1493 except vimconn.vimconnException as e:
1494 task["error_msg"] = self._format_vim_error_msg(str(e))
1495 if isinstance(e, vimconn.vimconnNotFoundException):
1496 # If not found mark as Done and fill error_msg
1497 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1498 return None
1499 task["status"] = "FAILED"
1500 return None