initial implementation of vimconn_fos for Eclipse fog05 VIM
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 Several vim_wim_actions can refer to the same element at VIM (flavor, network, ...). This is somethng to avoid if RO
28 is migrated to a non-relational database as mongo db. Each vim_wim_actions reference a different instance_Xxxxx
29 In this case "related" colunm contains the same value, to know they refer to the same vim. In case of deletion, it
30 there is related tasks using this element, it is not deleted, The vim_info needed to delete is transfered to other task
31
32 The task content is (M: stored at memory, D: stored at database):
33 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
34 MD task_index: index number of the task. This together with the previous forms a unique key identifier
35 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
36 MD vim_id: id of the vm,net,etc at VIM
37 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
38 MD item_id: uuid of the referenced entry in the previous table
39 MD action: CREATE, DELETE, FIND
40 MD status: SCHEDULED: action need to be done
41 BUILD: not used
42 DONE: Done and it must be polled to VIM periodically to see status. ONLY for action=CREATE or FIND
43 FAILED: It cannot be created/found/deleted
44 FINISHED: similar to DONE, but no refresh is needed anymore. Task is maintained at database but
45 it is never processed by any thread
46 SUPERSEDED: similar to FINSISHED, but nothing has been done to completed the task.
47 MD extra: text with yaml format at database, dict at memory with:
48 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken
49 from other related tasks
50 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains
51 the FIND params
52 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends
53 on a net creation
54 can contain an int (single index on the same instance-action) or str (compete action ID)
55 sdn_net_id: used for net.
56 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
57 iface_id: uuid of intance_interfaces
58 sdn_port_id:
59 sdn_net_id:
60 vim_info
61 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
62 created: False if the VIM element is not created by other actions, and it should not be deleted
63 vim_status: VIM status of the element. Stored also at database in the instance_XXX
64 vim_info: Detailed information of a vm/net from the VIM. Stored at database in the instance_XXX but not at
65 vim_wim_actions
66 M depends: dict with task_index(from depends_on) to vim_id
67 M params: same as extra[params]
68 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
69 MD created_at: task creation time. The task of creation must be the oldest
70 MD modified_at: next time task need to be processed. For example, for a refresh, it contain next time refresh must
71 be done
72 MD related: All the tasks over the same VIM element have same "related". Note that other VIMs can contain the
73 same value of related, but this thread only process those task of one VIM. Also related can be the
74 same among several NS os isntance-scenarios
75 MD worker: Used to lock in case of several thread workers.
76
77 """
78
79 import threading
80 import time
81 import Queue
82 import logging
83 import vimconn
84 import vimconn_openvim
85 import vimconn_aws
86 import vimconn_opennebula
87 import vimconn_openstack
88 import vimconn_vmware
89 import vimconn_fos
90 import yaml
91 from db_base import db_base_Exception
92 from lib_osm_openvim.ovim import ovimException
93 from copy import deepcopy
94
95 __author__ = "Alfonso Tierno, Pablo Montes"
96 __date__ = "$28-Sep-2017 12:07:15$"
97
98 vim_module = {
99 "openvim": vimconn_openvim,
100 "aws": vimconn_aws,
101 "opennebula": vimconn_opennebula,
102 "openstack": vimconn_openstack,
103 "vmware": vimconn_vmware,
104 "fos": vimconn_fos
105 }
106
107
108 def is_task_id(task_id):
109 return task_id.startswith("TASK-")
110
111
112 class VimThreadException(Exception):
113 pass
114
115
116 class VimThreadExceptionNotFound(VimThreadException):
117 pass
118
119
120 class vim_thread(threading.Thread):
121 REFRESH_BUILD = 5 # 5 seconds
122 REFRESH_ACTIVE = 60 # 1 minute
123 REFRESH_ERROR = 600
124 REFRESH_DELETE = 3600 * 10
125
126 def __init__(self, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
127 db=None, db_lock=None, ovim=None):
128 """Init a thread.
129 Arguments:
130 'id' number of thead
131 'name' name of thread
132 'host','user': host ip or name to manage and user
133 'db', 'db_lock': database class and lock to use it in exclusion
134 """
135 threading.Thread.__init__(self)
136 self.vim = None
137 self.error_status = None
138 self.datacenter_name = datacenter_name
139 self.datacenter_tenant_id = datacenter_tenant_id
140 self.ovim = ovim
141 if not name:
142 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
143 else:
144 self.name = name
145 self.vim_persistent_info = {}
146 self.my_id = self.name[:64]
147
148 self.logger = logging.getLogger('openmano.vim.' + self.name)
149 self.db = db
150 self.db_lock = db_lock
151
152 self.task_lock = task_lock
153 self.task_queue = Queue.Queue(2000)
154
155 def get_vimconnector(self):
156 try:
157 from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
158 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
159 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
160 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
161 'user', 'passwd', 'dt.config as dt_config')
162 where_ = {"dt.uuid": self.datacenter_tenant_id}
163 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
164 vim = vims[0]
165 vim_config = {}
166 if vim["config"]:
167 vim_config.update(yaml.load(vim["config"]))
168 if vim["dt_config"]:
169 vim_config.update(yaml.load(vim["dt_config"]))
170 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
171 vim_config['datacenter_id'] = vim.get('datacenter_id')
172
173 # get port_mapping
174 with self.db_lock:
175 vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings(
176 db_filter={"region": vim_config['datacenter_id'], "pci": None})
177
178 self.vim = vim_module[vim["type"]].vimconnector(
179 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
180 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
181 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
182 user=vim['user'], passwd=vim['passwd'],
183 config=vim_config, persistent_info=self.vim_persistent_info
184 )
185 self.error_status = None
186 except Exception as e:
187 self.logger.error("Cannot load vimconnector for vim_account {}: {}".format(self.datacenter_tenant_id, e))
188 self.vim = None
189 self.error_status = "Error loading vimconnector: {}".format(e)
190
191 def _get_db_task(self):
192 """
193 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
194 :return: None
195 """
196 now = time.time()
197 try:
198 database_limit = 20
199 task_related = None
200 while True:
201 # get 20 (database_limit) entries each time
202 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
203 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
204 "status": ['SCHEDULED', 'BUILD', 'DONE'],
205 "worker": [None, self.my_id], "modified_at<=": now
206 },
207 ORDER_BY=("modified_at", "created_at",),
208 LIMIT=database_limit)
209 if not vim_actions:
210 return None, None
211 # if vim_actions[0]["modified_at"] > now:
212 # return int(vim_actions[0] - now)
213 for task in vim_actions:
214 # block related task
215 if task_related == task["related"]:
216 continue # ignore if a locking has already tried for these task set
217 task_related = task["related"]
218 # lock ...
219 self.db.update_rows("vim_wim_actions", UPDATE={"worker": self.my_id}, modified_time=0,
220 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
221 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
222 "worker": [None, self.my_id],
223 "related": task_related,
224 "item": task["item"],
225 })
226 # ... and read all related and check if locked
227 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
228 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
229 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
230 "related": task_related,
231 "item": task["item"],
232 },
233 ORDER_BY=("created_at",))
234 # check that all related tasks have been locked. If not release and try again. It can happen
235 # for race conditions if a new related task has been inserted by nfvo in the process
236 some_tasks_locked = False
237 some_tasks_not_locked = False
238 creation_task = None
239 for relate_task in related_tasks:
240 if relate_task["worker"] != self.my_id:
241 some_tasks_not_locked = True
242 else:
243 some_tasks_locked = True
244 if not creation_task and relate_task["action"] in ("CREATE", "FIND"):
245 creation_task = relate_task
246 if some_tasks_not_locked:
247 if some_tasks_locked: # unlock
248 self.db.update_rows("vim_wim_actions", UPDATE={"worker": None}, modified_time=0,
249 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
250 "worker": self.my_id,
251 "related": task_related,
252 "item": task["item"],
253 })
254 continue
255
256 # task of creation must be the first in the list of related_task
257 assert(related_tasks[0]["action"] in ("CREATE", "FIND"))
258
259 if task["extra"]:
260 extra = yaml.load(task["extra"])
261 else:
262 extra = {}
263 task["extra"] = extra
264 if extra.get("depends_on"):
265 task["depends"] = {}
266 if extra.get("params"):
267 task["params"] = deepcopy(extra["params"])
268 return task, related_tasks
269 except Exception as e:
270 self.logger.critical("Unexpected exception at _get_db_task: " + str(e), exc_info=True)
271 return None, None
272
273 def _delete_task(self, task):
274 """
275 Determine if this task need to be done or superseded
276 :return: None
277 """
278
279 def copy_extra_created(copy_to, copy_from):
280 copy_to["created"] = copy_from["created"]
281 if copy_from.get("sdn_net_id"):
282 copy_to["sdn_net_id"] = copy_from["sdn_net_id"]
283 if copy_from.get("interfaces"):
284 copy_to["interfaces"] = copy_from["interfaces"]
285 if copy_from.get("created_items"):
286 if not copy_to.get("created_items"):
287 copy_to["created_items"] = {}
288 copy_to["created_items"].update(copy_from["created_items"])
289
290 task_create = None
291 dependency_task = None
292 deletion_needed = False
293 if task["status"] == "FAILED":
294 return # TODO need to be retry??
295 try:
296 # get all related tasks
297 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
298 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
299 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
300 "action": ["FIND", "CREATE"],
301 "related": task["related"],
302 },
303 ORDER_BY=("created_at",),
304 )
305 for related_task in related_tasks:
306 if related_task["item"] == task["item"] and related_task["item_id"] == task["item_id"]:
307 task_create = related_task
308 # TASK_CREATE
309 if related_task["extra"]:
310 extra_created = yaml.load(related_task["extra"])
311 if extra_created.get("created"):
312 deletion_needed = True
313 related_task["extra"] = extra_created
314 elif not dependency_task:
315 dependency_task = related_task
316 if task_create and dependency_task:
317 break
318
319 # mark task_create as FINISHED
320 self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"},
321 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
322 "instance_action_id": task_create["instance_action_id"],
323 "task_index": task_create["task_index"]
324 })
325 if not deletion_needed:
326 return
327 elif dependency_task:
328 # move create information from task_create to relate_task
329 extra_new_created = yaml.load(dependency_task["extra"]) or {}
330 extra_new_created["created"] = extra_created["created"]
331 copy_extra_created(copy_to=extra_new_created, copy_from=extra_created)
332
333 self.db.update_rows("vim_wim_actions",
334 UPDATE={"extra": yaml.safe_dump(extra_new_created, default_flow_style=True,
335 width=256),
336 "vim_id": task_create.get("vim_id")},
337 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
338 "instance_action_id": dependency_task["instance_action_id"],
339 "task_index": dependency_task["task_index"]
340 })
341 return False
342 else:
343 task["vim_id"] = task_create["vim_id"]
344 copy_extra_created(copy_to=task["extra"], copy_from=task_create["extra"])
345 return True
346
347 except Exception as e:
348 self.logger.critical("Unexpected exception at _delete_task: " + str(e), exc_info=True)
349
350 def _refres_vm(self, task):
351 """Call VIM to get VMs status"""
352 database_update = None
353
354 vim_id = task["vim_id"]
355 vm_to_refresh_list = [vim_id]
356 try:
357 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
358 vim_info = vim_dict[vim_id]
359 except vimconn.vimconnException as e:
360 # Mark all tasks at VIM_ERROR status
361 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
362 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
363
364 task_id = task["instance_action_id"] + "." + str(task["task_index"])
365 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
366
367 # check and update interfaces
368 task_warning_msg = ""
369 for interface in vim_info.get("interfaces", ()):
370 vim_interface_id = interface["vim_interface_id"]
371 if vim_interface_id not in task["extra"]["interfaces"]:
372 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
373 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
374 continue
375 task_interface = task["extra"]["interfaces"][vim_interface_id]
376 task_vim_interface = task_interface.get("vim_info")
377 if task_vim_interface != interface:
378 # delete old port
379 if task_interface.get("sdn_port_id"):
380 try:
381 with self.db_lock:
382 self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
383 task_interface["sdn_port_id"] = None
384 except ovimException as e:
385 error_text = "ovimException deleting external_port={}: {}".format(
386 task_interface["sdn_port_id"], e)
387 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
388 task_warning_msg += error_text
389 # TODO Set error_msg at instance_nets instead of instance VMs
390
391 # Create SDN port
392 sdn_net_id = task_interface.get("sdn_net_id")
393 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
394 sdn_port_name = sdn_net_id + "." + task["vim_id"]
395 sdn_port_name = sdn_port_name[:63]
396 try:
397 with self.db_lock:
398 sdn_port_id = self.ovim.new_external_port(
399 {"compute_node": interface["compute_node"],
400 "pci": interface["pci"],
401 "vlan": interface.get("vlan"),
402 "net_id": sdn_net_id,
403 "region": self.vim["config"]["datacenter_id"],
404 "name": sdn_port_name,
405 "mac": interface.get("mac_address")})
406 task_interface["sdn_port_id"] = sdn_port_id
407 except (ovimException, Exception) as e:
408 error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
409 format(interface["compute_node"], interface["pci"], interface.get("vlan"), e)
410 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
411 task_warning_msg += error_text
412 # TODO Set error_msg at instance_nets instead of instance VMs
413
414 self.db.update_rows('instance_interfaces',
415 UPDATE={"mac_address": interface.get("mac_address"),
416 "ip_address": interface.get("ip_address"),
417 "vim_interface_id": interface.get("vim_interface_id"),
418 "vim_info": interface.get("vim_info"),
419 "sdn_port_id": task_interface.get("sdn_port_id"),
420 "compute_node": interface.get("compute_node"),
421 "pci": interface.get("pci"),
422 "vlan": interface.get("vlan")},
423 WHERE={'uuid': task_interface["iface_id"]})
424 task_interface["vim_info"] = interface
425
426 # check and update task and instance_vms database
427 vim_info_error_msg = None
428 if vim_info.get("error_msg"):
429 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
430 elif task_warning_msg:
431 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
432 task_vim_info = task["extra"].get("vim_info")
433 task_error_msg = task.get("error_msg")
434 task_vim_status = task["extra"].get("vim_status")
435 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
436 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
437 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
438 if vim_info.get("vim_info"):
439 database_update["vim_info"] = vim_info["vim_info"]
440
441 task["extra"]["vim_status"] = vim_info["status"]
442 task["error_msg"] = vim_info_error_msg
443 if vim_info.get("vim_info"):
444 task["extra"]["vim_info"] = vim_info["vim_info"]
445
446 return database_update
447
448 def _refres_net(self, task):
449 """Call VIM to get network status"""
450 database_update = None
451
452 vim_id = task["vim_id"]
453 net_to_refresh_list = [vim_id]
454 try:
455 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
456 vim_info = vim_dict[vim_id]
457 except vimconn.vimconnException as e:
458 # Mark all tasks at VIM_ERROR status
459 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
460 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
461
462 task_id = task["instance_action_id"] + "." + str(task["task_index"])
463 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
464
465 task_vim_info = task["extra"].get("vim_info")
466 task_vim_status = task["extra"].get("vim_status")
467 task_error_msg = task.get("error_msg")
468 task_sdn_net_id = task["extra"].get("sdn_net_id")
469
470 vim_info_status = vim_info["status"]
471 vim_info_error_msg = vim_info.get("error_msg")
472 # get ovim status
473 if task_sdn_net_id:
474 try:
475 with self.db_lock:
476 sdn_net = self.ovim.show_network(task_sdn_net_id)
477 except (ovimException, Exception) as e:
478 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
479 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
480 sdn_net = {"status": "ERROR", "last_error": text_error}
481 if sdn_net["status"] == "ERROR":
482 if not vim_info_error_msg:
483 vim_info_error_msg = str(sdn_net.get("last_error"))
484 else:
485 vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
486 self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
487 self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
488 vim_info_status = "ERROR"
489 elif sdn_net["status"] == "BUILD":
490 if vim_info_status == "ACTIVE":
491 vim_info_status = "BUILD"
492
493 # update database
494 if vim_info_error_msg:
495 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
496 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
497 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
498 task["extra"]["vim_status"] = vim_info_status
499 task["error_msg"] = vim_info_error_msg
500 if vim_info.get("vim_info"):
501 task["extra"]["vim_info"] = vim_info["vim_info"]
502 database_update = {"status": vim_info_status, "error_msg": vim_info_error_msg}
503 if vim_info.get("vim_info"):
504 database_update["vim_info"] = vim_info["vim_info"]
505 return database_update
506
507 def _proccess_pending_tasks(self, task, related_tasks):
508 old_task_status = task["status"]
509 create_or_find = False # if as result of processing this task something is created or found
510 next_refresh = 0
511
512 try:
513 if task["status"] == "SCHEDULED":
514 # check if tasks that this depends on have been completed
515 dependency_not_completed = False
516 dependency_modified_at = 0
517 for task_index in task["extra"].get("depends_on", ()):
518 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
519 if not task_dependency:
520 raise VimThreadException(
521 "Cannot get depending net task trying to get depending task {}.{}".format(
522 task["instance_action_id"], task_index))
523 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
524 # database must be look again
525 if task_dependency["status"] == "SCHEDULED":
526 dependency_not_completed = True
527 dependency_modified_at = task_dependency["modified_at"]
528 break
529 elif task_dependency["status"] == "FAILED":
530 raise VimThreadException(
531 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
532 task["action"], task["item"],
533 task["instance_action_id"], task["task_index"],
534 task_dependency["instance_action_id"], task_dependency["task_index"],
535 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
536
537 task["depends"]["TASK-"+str(task_index)] = task_dependency["vim_id"]
538 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], task_index)] =\
539 task_dependency["vim_id"]
540 if dependency_not_completed:
541 # Move this task to the time dependency is going to be modified plus 10 seconds.
542 self.db.update_rows("vim_wim_actions", modified_time=dependency_modified_at + 10,
543 UPDATE={"worker": None},
544 WHERE={"datacenter_vim_id": self.datacenter_tenant_id, "worker": self.my_id,
545 "related": task["related"],
546 })
547 # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
548 # if task["extra"]["tries"] > 3:
549 # raise VimThreadException(
550 # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
551 # "(task {}.{})".format(task["action"], task["item"],
552 # task["instance_action_id"], task["task_index"],
553 # task_dependency["instance_action_id"], task_dependency["task_index"]
554 # task_dependency["action"], task_dependency["item"]))
555 return
556
557 database_update = None
558 if task["action"] == "DELETE":
559 deleted_needed = self._delete_task(task)
560 if not deleted_needed:
561 task["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
562 task["error_msg"] = None
563
564 if task["status"] == "SUPERSEDED":
565 # not needed to do anything but update database with the new status
566 database_update = None
567 elif not self.vim:
568 task["status"] = "FAILED"
569 task["error_msg"] = self.error_status
570 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
571 elif task["item_id"] != related_tasks[0]["item_id"] and task["action"] in ("FIND", "CREATE"):
572 # Do nothing, just copy values from one to another and updata database
573 task["status"] = related_tasks[0]["status"]
574 task["error_msg"] = related_tasks[0]["error_msg"]
575 task["vim_id"] = related_tasks[0]["vim_id"]
576 extra = yaml.load(related_tasks[0]["extra"])
577 task["extra"]["vim_status"] = extra["vim_status"]
578 next_refresh = related_tasks[0]["modified_at"] + 0.001
579 database_update = {"status": task["extra"].get("vim_status", "VIM_ERROR"),
580 "error_msg": task["error_msg"]}
581 if task["item"] == 'instance_vms':
582 database_update["vim_vm_id"] = task["vim_id"]
583 elif task["item"] == 'instance_nets':
584 database_update["vim_net_id"] = task["vim_id"]
585 elif task["item"] == 'instance_vms':
586 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
587 database_update = self._refres_vm(task)
588 create_or_find = True
589 elif task["action"] == "CREATE":
590 create_or_find = True
591 database_update = self.new_vm(task)
592 elif task["action"] == "DELETE":
593 self.del_vm(task)
594 else:
595 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
596 elif task["item"] == 'instance_nets':
597 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
598 database_update = self._refres_net(task)
599 create_or_find = True
600 elif task["action"] == "CREATE":
601 create_or_find = True
602 database_update = self.new_net(task)
603 elif task["action"] == "DELETE":
604 self.del_net(task)
605 elif task["action"] == "FIND":
606 database_update = self.get_net(task)
607 else:
608 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
609 elif task["item"] == 'instance_sfis':
610 if task["action"] == "CREATE":
611 create_or_find = True
612 database_update = self.new_sfi(task)
613 elif task["action"] == "DELETE":
614 self.del_sfi(task)
615 else:
616 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
617 elif task["item"] == 'instance_sfs':
618 if task["action"] == "CREATE":
619 create_or_find = True
620 database_update = self.new_sf(task)
621 elif task["action"] == "DELETE":
622 self.del_sf(task)
623 else:
624 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
625 elif task["item"] == 'instance_classifications':
626 if task["action"] == "CREATE":
627 create_or_find = True
628 database_update = self.new_classification(task)
629 elif task["action"] == "DELETE":
630 self.del_classification(task)
631 else:
632 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
633 elif task["item"] == 'instance_sfps':
634 if task["action"] == "CREATE":
635 create_or_find = True
636 database_update = self.new_sfp(task)
637 elif task["action"] == "DELETE":
638 self.del_sfp(task)
639 else:
640 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
641 else:
642 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
643 # TODO
644 except VimThreadException as e:
645 task["error_msg"] = str(e)
646 task["status"] = "FAILED"
647 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
648 if task["item"] == 'instance_vms':
649 database_update["vim_vm_id"] = None
650 elif task["item"] == 'instance_nets':
651 database_update["vim_net_id"] = None
652
653 task_id = task["instance_action_id"] + "." + str(task["task_index"])
654 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
655 task_id, task["item"], task["action"], task["status"],
656 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
657 try:
658 if not next_refresh:
659 if task["status"] == "DONE":
660 next_refresh = time.time()
661 if task["extra"].get("vim_status") == "BUILD":
662 next_refresh += self.REFRESH_BUILD
663 elif task["extra"].get("vim_status") in ("ERROR", "VIM_ERROR"):
664 next_refresh += self.REFRESH_ERROR
665 elif task["extra"].get("vim_status") == "DELETED":
666 next_refresh += self.REFRESH_DELETE
667 else:
668 next_refresh += self.REFRESH_ACTIVE
669 elif task["status"] == "FAILED":
670 next_refresh = time.time() + self.REFRESH_DELETE
671
672 if create_or_find:
673 # modify all related task with action FIND/CREATED non SCHEDULED
674 self.db.update_rows(
675 table="vim_wim_actions", modified_time=next_refresh + 0.001,
676 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
677 "error_msg": task["error_msg"],
678 },
679
680 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
681 "worker": self.my_id,
682 "action": ["FIND", "CREATE"],
683 "related": task["related"],
684 "status<>": "SCHEDULED",
685 })
686 # modify own task
687 self.db.update_rows(
688 table="vim_wim_actions", modified_time=next_refresh,
689 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
690 "error_msg": task["error_msg"],
691 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
692 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
693 # Unlock tasks
694 self.db.update_rows(
695 table="vim_wim_actions", modified_time=0,
696 UPDATE={"worker": None},
697 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
698 "worker": self.my_id,
699 "related": task["related"],
700 })
701
702 # Update table instance_actions
703 if old_task_status == "SCHEDULED" and task["status"] != old_task_status:
704 self.db.update_rows(
705 table="instance_actions",
706 UPDATE={("number_failed" if task["status"] == "FAILED" else "number_done"): {"INCREMENT": 1}},
707 WHERE={"uuid": task["instance_action_id"]})
708 if database_update:
709 self.db.update_rows(table=task["item"],
710 UPDATE=database_update,
711 WHERE={"related": task["related"]})
712 except db_base_Exception as e:
713 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
714
715 def insert_task(self, task):
716 try:
717 self.task_queue.put(task, False)
718 return None
719 except Queue.Full:
720 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
721
722 def del_task(self, task):
723 with self.task_lock:
724 if task["status"] == "SCHEDULED":
725 task["status"] = "SUPERSEDED"
726 return True
727 else: # task["status"] == "processing"
728 self.task_lock.release()
729 return False
730
731 def run(self):
732 self.logger.debug("Starting")
733 while True:
734 self.get_vimconnector()
735 self.logger.debug("Vimconnector loaded")
736 reload_thread = False
737
738 while True:
739 try:
740 while not self.task_queue.empty():
741 task = self.task_queue.get()
742 if isinstance(task, list):
743 pass
744 elif isinstance(task, str):
745 if task == 'exit':
746 return 0
747 elif task == 'reload':
748 reload_thread = True
749 break
750 self.task_queue.task_done()
751 if reload_thread:
752 break
753
754 task, related_tasks = self._get_db_task()
755 if task:
756 self._proccess_pending_tasks(task, related_tasks)
757 else:
758 time.sleep(5)
759
760 except Exception as e:
761 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
762
763 self.logger.debug("Finishing")
764
765 def _look_for_task(self, instance_action_id, task_id):
766 """
767 Look for a concrete task at vim_actions database table
768 :param instance_action_id: The instance_action_id
769 :param task_id: Can have several formats:
770 <task index>: integer
771 TASK-<task index> :backward compatibility,
772 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
773 :return: Task dictionary or None if not found
774 """
775 if isinstance(task_id, int):
776 task_index = task_id
777 else:
778 if task_id.startswith("TASK-"):
779 task_id = task_id[5:]
780 ins_action_id, _, task_index = task_id.rpartition(".")
781 if ins_action_id:
782 instance_action_id = ins_action_id
783
784 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
785 "task_index": task_index})
786 if not tasks:
787 return None
788 task = tasks[0]
789 task["params"] = None
790 task["depends"] = {}
791 if task["extra"]:
792 extra = yaml.load(task["extra"])
793 task["extra"] = extra
794 task["params"] = extra.get("params")
795 else:
796 task["extra"] = {}
797 return task
798
799 @staticmethod
800 def _format_vim_error_msg(error_text, max_length=1024):
801 if error_text and len(error_text) >= max_length:
802 return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:]
803 return error_text
804
805 def new_vm(self, task):
806 task_id = task["instance_action_id"] + "." + str(task["task_index"])
807 try:
808 params = task["params"]
809 depends = task.get("depends")
810 net_list = params[5]
811 for net in net_list:
812 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
813 network_id = task["depends"][net["net_id"]]
814 if not network_id:
815 raise VimThreadException(
816 "Cannot create VM because depends on a network not created or found: " +
817 str(depends[net["net_id"]]["error_msg"]))
818 net["net_id"] = network_id
819 params_copy = deepcopy(params)
820 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
821
822 # fill task_interfaces. Look for snd_net_id at database for each interface
823 task_interfaces = {}
824 for iface in params_copy[5]:
825 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
826 result = self.db.get_rows(
827 SELECT=('sdn_net_id', 'interface_id'),
828 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
829 WHERE={'ii.uuid': iface["uuid"]})
830 if result:
831 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
832 task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
833 else:
834 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
835 iface["uuid"]),
836 exc_info=True)
837
838 task["vim_info"] = {}
839 task["extra"]["interfaces"] = task_interfaces
840 task["extra"]["created"] = True
841 task["extra"]["created_items"] = created_items
842 task["extra"]["vim_status"] = "BUILD"
843 task["error_msg"] = None
844 task["status"] = "DONE"
845 task["vim_id"] = vim_vm_id
846 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
847 return instance_element_update
848
849 except (vimconn.vimconnException, VimThreadException) as e:
850 self.logger.error("task={} new-VM: {}".format(task_id, e))
851 error_text = self._format_vim_error_msg(str(e))
852 task["error_msg"] = error_text
853 task["status"] = "FAILED"
854 task["vim_id"] = None
855 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
856 return instance_element_update
857
858 def del_vm(self, task):
859 task_id = task["instance_action_id"] + "." + str(task["task_index"])
860 vm_vim_id = task["vim_id"]
861 interfaces = task["extra"].get("interfaces", ())
862 try:
863 for iface in interfaces.values():
864 if iface.get("sdn_port_id"):
865 try:
866 with self.db_lock:
867 self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
868 except ovimException as e:
869 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
870 task_id, iface["sdn_port_id"], e), exc_info=True)
871 # TODO Set error_msg at instance_nets
872
873 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
874 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
875 task["error_msg"] = None
876 return None
877
878 except vimconn.vimconnException as e:
879 task["error_msg"] = self._format_vim_error_msg(str(e))
880 if isinstance(e, vimconn.vimconnNotFoundException):
881 # If not found mark as Done and fill error_msg
882 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
883 return None
884 task["status"] = "FAILED"
885 return None
886
887 def _get_net_internal(self, task, filter_param):
888 """
889 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
890 :param task: task for this find or find-or-create action
891 :param filter_param: parameters to send to the vimconnector
892 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
893 when network is not found or found more than one
894 """
895 vim_nets = self.vim.get_network_list(filter_param)
896 if not vim_nets:
897 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
898 elif len(vim_nets) > 1:
899 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
900 vim_net_id = vim_nets[0]["id"]
901
902 # Discover if this network is managed by a sdn controller
903 sdn_net_id = None
904 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
905 WHERE={'vim_net_id': vim_net_id, 'datacenter_tenant_id': self.datacenter_tenant_id},
906 ORDER="instance_scenario_id")
907 if result:
908 sdn_net_id = result[0]['sdn_net_id']
909
910 task["status"] = "DONE"
911 task["extra"]["vim_info"] = {}
912 task["extra"]["created"] = False
913 task["extra"]["vim_status"] = "BUILD"
914 task["extra"]["sdn_net_id"] = sdn_net_id
915 task["error_msg"] = None
916 task["vim_id"] = vim_net_id
917 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
918 "error_msg": None, "sdn_net_id": sdn_net_id}
919 return instance_element_update
920
921 def get_net(self, task):
922 task_id = task["instance_action_id"] + "." + str(task["task_index"])
923 try:
924 params = task["params"]
925 filter_param = params[0]
926 instance_element_update = self._get_net_internal(task, filter_param)
927 return instance_element_update
928
929 except (vimconn.vimconnException, VimThreadException) as e:
930 self.logger.error("task={} get-net: {}".format(task_id, e))
931 task["status"] = "FAILED"
932 task["vim_id"] = None
933 task["error_msg"] = self._format_vim_error_msg(str(e))
934 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
935 "error_msg": task["error_msg"]}
936 return instance_element_update
937
938 def new_net(self, task):
939 vim_net_id = None
940 sdn_net_id = None
941 task_id = task["instance_action_id"] + "." + str(task["task_index"])
942 action_text = ""
943 try:
944 # FIND
945 if task["extra"].get("find"):
946 action_text = "finding"
947 filter_param = task["extra"]["find"][0]
948 try:
949 instance_element_update = self._get_net_internal(task, filter_param)
950 return instance_element_update
951 except VimThreadExceptionNotFound:
952 pass
953 # CREATE
954 params = task["params"]
955 action_text = "creating VIM"
956 vim_net_id, created_items = self.vim.new_network(*params[0:3])
957
958 net_name = params[0]
959 net_type = params[1]
960 wim_account_name = None
961 if len(params) >= 4:
962 wim_account_name = params[3]
963
964 sdn_controller = self.vim.config.get('sdn-controller')
965 if sdn_controller and (net_type == "data" or net_type == "ptp"):
966 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
967
968 vim_net = self.vim.get_network(vim_net_id)
969 if vim_net.get('encapsulation') != 'vlan':
970 raise vimconn.vimconnException(
971 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
972 net_name, net_type, vim_net['encapsulation']))
973 network["vlan"] = vim_net.get('segmentation_id')
974 action_text = "creating SDN"
975 with self.db_lock:
976 sdn_net_id = self.ovim.new_network(network)
977
978 if wim_account_name and self.vim.config["wim_external_ports"]:
979 # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
980 action_text = "attaching external port to ovim network"
981 sdn_port_name = sdn_net_id + "." + task["vim_id"]
982 sdn_port_name = sdn_port_name[:63]
983 sdn_port_data = {
984 "compute_node": "__WIM:" + wim_account_name[0:58],
985 "pci": None,
986 "vlan": network["vlan"],
987 "net_id": sdn_net_id,
988 "region": self.vim["config"]["datacenter_id"],
989 "name": sdn_port_name,
990 }
991 try:
992 with self.db_lock:
993 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
994 except ovimException:
995 sdn_port_data["compute_node"] = "__WIM"
996 with self.db_lock:
997 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
998 self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
999 sdn_net_id))
1000 task["status"] = "DONE"
1001 task["extra"]["vim_info"] = {}
1002 task["extra"]["sdn_net_id"] = sdn_net_id
1003 task["extra"]["vim_status"] = "BUILD"
1004 task["extra"]["created"] = True
1005 task["extra"]["created_items"] = created_items
1006 task["error_msg"] = None
1007 task["vim_id"] = vim_net_id
1008 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
1009 "created": True, "error_msg": None}
1010 return instance_element_update
1011 except (vimconn.vimconnException, ovimException) as e:
1012 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1013 task["status"] = "FAILED"
1014 task["vim_id"] = vim_net_id
1015 task["error_msg"] = self._format_vim_error_msg(str(e))
1016 task["extra"]["sdn_net_id"] = sdn_net_id
1017 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
1018 "error_msg": task["error_msg"]}
1019 return instance_element_update
1020
1021 def del_net(self, task):
1022 net_vim_id = task["vim_id"]
1023 sdn_net_id = task["extra"].get("sdn_net_id")
1024 try:
1025 if sdn_net_id:
1026 # Delete any attached port to this sdn network. There can be ports associated to this network in case
1027 # it was manually done using 'openmano vim-net-sdn-attach'
1028 with self.db_lock:
1029 port_list = self.ovim.get_ports(columns={'uuid'},
1030 filter={'name': 'external_port', 'net_id': sdn_net_id})
1031 for port in port_list:
1032 self.ovim.delete_port(port['uuid'], idempotent=True)
1033 self.ovim.delete_network(sdn_net_id, idempotent=True)
1034 if net_vim_id:
1035 self.vim.delete_network(net_vim_id, task["extra"].get("created_items"))
1036 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1037 task["error_msg"] = None
1038 return None
1039 except ovimException as e:
1040 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
1041 "ports for net {}: {}".format(sdn_net_id, str(e)))
1042 except vimconn.vimconnException as e:
1043 task["error_msg"] = self._format_vim_error_msg(str(e))
1044 if isinstance(e, vimconn.vimconnNotFoundException):
1045 # If not found mark as Done and fill error_msg
1046 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1047 return None
1048 task["status"] = "FAILED"
1049 return None
1050
1051 # Service Function Instances
1052 def new_sfi(self, task):
1053 vim_sfi_id = None
1054 try:
1055 # Waits for interfaces to be ready (avoids failure)
1056 time.sleep(1)
1057 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1058 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1059 error_text = ""
1060 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces")
1061 ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id")
1062 egress_interface_id = task.get("extra").get("params").get("egress_interface_id")
1063 ingress_vim_interface_id = None
1064 egress_vim_interface_id = None
1065 for vim_interface, interface_data in interfaces.iteritems():
1066 if interface_data.get("interface_id") == ingress_interface_id:
1067 ingress_vim_interface_id = vim_interface
1068 break
1069 if ingress_interface_id != egress_interface_id:
1070 for vim_interface, interface_data in interfaces.iteritems():
1071 if interface_data.get("interface_id") == egress_interface_id:
1072 egress_vim_interface_id = vim_interface
1073 break
1074 else:
1075 egress_vim_interface_id = ingress_vim_interface_id
1076 if not ingress_vim_interface_id or not egress_vim_interface_id:
1077 error_text = "Error creating Service Function Instance, Ingress: {}, Egress: {}".format(
1078 ingress_vim_interface_id, egress_vim_interface_id)
1079 self.logger.error(error_text)
1080 task["error_msg"] = error_text
1081 task["status"] = "FAILED"
1082 task["vim_id"] = None
1083 return None
1084 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1085 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack,
1086 # only the first ingress and first egress ports will be used to create the SFI (Port Pair).
1087 ingress_port_id_list = [ingress_vim_interface_id]
1088 egress_port_id_list = [egress_vim_interface_id]
1089 name = "sfi-%s" % task["item_id"][:8]
1090 # By default no form of IETF SFC Encapsulation will be used
1091 vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False)
1092
1093 task["extra"]["created"] = True
1094 task["extra"]["vim_status"] = "ACTIVE"
1095 task["error_msg"] = None
1096 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1097 task["vim_id"] = vim_sfi_id
1098 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1099 return instance_element_update
1100
1101 except (vimconn.vimconnException, VimThreadException) as e:
1102 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1103 error_text = self._format_vim_error_msg(str(e))
1104 task["error_msg"] = error_text
1105 task["status"] = "FAILED"
1106 task["vim_id"] = None
1107 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1108 return instance_element_update
1109
1110 def del_sfi(self, task):
1111 sfi_vim_id = task["vim_id"]
1112 try:
1113 self.vim.delete_sfi(sfi_vim_id)
1114 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1115 task["error_msg"] = None
1116 return None
1117
1118 except vimconn.vimconnException as e:
1119 task["error_msg"] = self._format_vim_error_msg(str(e))
1120 if isinstance(e, vimconn.vimconnNotFoundException):
1121 # If not found mark as Done and fill error_msg
1122 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1123 return None
1124 task["status"] = "FAILED"
1125 return None
1126
1127 def new_sf(self, task):
1128 vim_sf_id = None
1129 try:
1130 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1131 error_text = ""
1132 depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]]
1133 # sfis = task.get("depends").values()[0].get("extra").get("params")[5]
1134 sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks]
1135 sfi_id_list = []
1136 for sfi in sfis:
1137 sfi_id_list.append(sfi.get("vim_id"))
1138 name = "sf-%s" % task["item_id"][:8]
1139 # By default no form of IETF SFC Encapsulation will be used
1140 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1141
1142 task["extra"]["created"] = True
1143 task["extra"]["vim_status"] = "ACTIVE"
1144 task["error_msg"] = None
1145 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1146 task["vim_id"] = vim_sf_id
1147 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1148 return instance_element_update
1149
1150 except (vimconn.vimconnException, VimThreadException) as e:
1151 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1152 error_text = self._format_vim_error_msg(str(e))
1153 task["error_msg"] = error_text
1154 task["status"] = "FAILED"
1155 task["vim_id"] = None
1156 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1157 return instance_element_update
1158
1159 def del_sf(self, task):
1160 sf_vim_id = task["vim_id"]
1161 try:
1162 self.vim.delete_sf(sf_vim_id)
1163 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1164 task["error_msg"] = None
1165 return None
1166
1167 except vimconn.vimconnException as e:
1168 task["error_msg"] = self._format_vim_error_msg(str(e))
1169 if isinstance(e, vimconn.vimconnNotFoundException):
1170 # If not found mark as Done and fill error_msg
1171 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1172 return None
1173 task["status"] = "FAILED"
1174 return None
1175
1176 def new_classification(self, task):
1177 vim_classification_id = None
1178 try:
1179 params = task["params"]
1180 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1181 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1182 error_text = ""
1183 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces").keys()
1184 # Bear in mind that different VIM connectors might support Classifications differently.
1185 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1186 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1187 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1188 # using the IPv4 flow classifier.
1189 name = "c-%s" % task["item_id"][:8]
1190 # if not CIDR is given for the IP addresses, add /32:
1191 ip_proto = int(params.get("ip_proto"))
1192 source_ip = params.get("source_ip")
1193 destination_ip = params.get("destination_ip")
1194 source_port = params.get("source_port")
1195 destination_port = params.get("destination_port")
1196 definition = {"logical_source_port": interfaces[0]}
1197 if ip_proto:
1198 if ip_proto == 1:
1199 ip_proto = 'icmp'
1200 elif ip_proto == 6:
1201 ip_proto = 'tcp'
1202 elif ip_proto == 17:
1203 ip_proto = 'udp'
1204 definition["protocol"] = ip_proto
1205 if source_ip:
1206 if '/' not in source_ip:
1207 source_ip += '/32'
1208 definition["source_ip_prefix"] = source_ip
1209 if source_port:
1210 definition["source_port_range_min"] = source_port
1211 definition["source_port_range_max"] = source_port
1212 if destination_port:
1213 definition["destination_port_range_min"] = destination_port
1214 definition["destination_port_range_max"] = destination_port
1215 if destination_ip:
1216 if '/' not in destination_ip:
1217 destination_ip += '/32'
1218 definition["destination_ip_prefix"] = destination_ip
1219
1220 vim_classification_id = self.vim.new_classification(
1221 name, 'legacy_flow_classifier', definition)
1222
1223 task["extra"]["created"] = True
1224 task["extra"]["vim_status"] = "ACTIVE"
1225 task["error_msg"] = None
1226 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1227 task["vim_id"] = vim_classification_id
1228 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id,
1229 "error_msg": None}
1230 return instance_element_update
1231
1232 except (vimconn.vimconnException, VimThreadException) as e:
1233 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1234 error_text = self._format_vim_error_msg(str(e))
1235 task["error_msg"] = error_text
1236 task["status"] = "FAILED"
1237 task["vim_id"] = None
1238 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1239 return instance_element_update
1240
1241 def del_classification(self, task):
1242 classification_vim_id = task["vim_id"]
1243 try:
1244 self.vim.delete_classification(classification_vim_id)
1245 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1246 task["error_msg"] = None
1247 return None
1248
1249 except vimconn.vimconnException as e:
1250 task["error_msg"] = self._format_vim_error_msg(str(e))
1251 if isinstance(e, vimconn.vimconnNotFoundException):
1252 # If not found mark as Done and fill error_msg
1253 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1254 return None
1255 task["status"] = "FAILED"
1256 return None
1257
1258 def new_sfp(self, task):
1259 vim_sfp_id = None
1260 try:
1261 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1262 depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in
1263 task.get("extra").get("depends_on")]
1264 error_text = ""
1265 sf_id_list = []
1266 classification_id_list = []
1267 for dep in depending_tasks:
1268 vim_id = dep.get("vim_id")
1269 resource = dep.get("item")
1270 if resource == "instance_sfs":
1271 sf_id_list.append(vim_id)
1272 elif resource == "instance_classifications":
1273 classification_id_list.append(vim_id)
1274
1275 name = "sfp-%s" % task["item_id"][:8]
1276 # By default no form of IETF SFC Encapsulation will be used
1277 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1278
1279 task["extra"]["created"] = True
1280 task["extra"]["vim_status"] = "ACTIVE"
1281 task["error_msg"] = None
1282 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1283 task["vim_id"] = vim_sfp_id
1284 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1285 return instance_element_update
1286
1287 except (vimconn.vimconnException, VimThreadException) as e:
1288 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1289 error_text = self._format_vim_error_msg(str(e))
1290 task["error_msg"] = error_text
1291 task["status"] = "FAILED"
1292 task["vim_id"] = None
1293 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1294 return instance_element_update
1295
1296 def del_sfp(self, task):
1297 sfp_vim_id = task["vim_id"]
1298 try:
1299 self.vim.delete_sfp(sfp_vim_id)
1300 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1301 task["error_msg"] = None
1302 return None
1303
1304 except vimconn.vimconnException as e:
1305 task["error_msg"] = self._format_vim_error_msg(str(e))
1306 if isinstance(e, vimconn.vimconnNotFoundException):
1307 # If not found mark as Done and fill error_msg
1308 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1309 return None
1310 task["status"] = "FAILED"
1311 return None