feature 8029 change RO to python3. Using vim plugins
[osm/RO.git] / RO / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 Several vim_wim_actions can refer to the same element at VIM (flavor, network, ...). This is somethng to avoid if RO
28 is migrated to a non-relational database as mongo db. Each vim_wim_actions reference a different instance_Xxxxx
29 In this case "related" colunm contains the same value, to know they refer to the same vim. In case of deletion, it
30 there is related tasks using this element, it is not deleted, The vim_info needed to delete is transfered to other task
31
32 The task content is (M: stored at memory, D: stored at database):
33 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
34 MD task_index: index number of the task. This together with the previous forms a unique key identifier
35 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
36 MD vim_id: id of the vm,net,etc at VIM
37 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
38 MD item_id: uuid of the referenced entry in the previous table
39 MD action: CREATE, DELETE, FIND
40 MD status: SCHEDULED: action need to be done
41 BUILD: not used
42 DONE: Done and it must be polled to VIM periodically to see status. ONLY for action=CREATE or FIND
43 FAILED: It cannot be created/found/deleted
44 FINISHED: similar to DONE, but no refresh is needed anymore. Task is maintained at database but
45 it is never processed by any thread
46 SUPERSEDED: similar to FINSISHED, but nothing has been done to completed the task.
47 MD extra: text with yaml format at database, dict at memory with:
48 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken
49 from other related tasks
50 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains
51 the FIND params
52 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends
53 on a net creation
54 can contain an int (single index on the same instance-action) or str (compete action ID)
55 sdn_net_id: used for net.
56 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
57 iface_id: uuid of intance_interfaces
58 sdn_port_id:
59 sdn_net_id:
60 vim_info
61 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
62 created: False if the VIM element is not created by other actions, and it should not be deleted
63 vim_status: VIM status of the element. Stored also at database in the instance_XXX
64 vim_info: Detailed information of a vm/net from the VIM. Stored at database in the instance_XXX but not at
65 vim_wim_actions
66 M depends: dict with task_index(from depends_on) to dependency task
67 M params: same as extra[params]
68 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
69 MD created_at: task creation time. The task of creation must be the oldest
70 MD modified_at: next time task need to be processed. For example, for a refresh, it contain next time refresh must
71 be done
72 MD related: All the tasks over the same VIM element have same "related". Note that other VIMs can contain the
73 same value of related, but this thread only process those task of one VIM. Also related can be the
74 same among several NS os isntance-scenarios
75 MD worker: Used to lock in case of several thread workers.
76
77 """
78
79 import threading
80 import time
81 import queue
82 import logging
83 from osm_ro import vimconn
84 import yaml
85 from osm_ro.db_base import db_base_Exception
86 # TODO py3 BEGIN
87 class ovimException(Exception):
88 pass
89 # TODO py3 END
90 from copy import deepcopy
91
92 __author__ = "Alfonso Tierno, Pablo Montes"
93 __date__ = "$28-Sep-2017 12:07:15$"
94
95
96 def is_task_id(task_id):
97 return task_id.startswith("TASK-")
98
99
100 class VimThreadException(Exception):
101 pass
102
103
104 class VimThreadExceptionNotFound(VimThreadException):
105 pass
106
107
108 class vim_thread(threading.Thread):
109 REFRESH_BUILD = 5 # 5 seconds
110 REFRESH_ACTIVE = 60 # 1 minute
111 REFRESH_ERROR = 600
112 REFRESH_DELETE = 3600 * 10
113
114 def __init__(self, task_lock, plugins, name=None, datacenter_name=None, datacenter_tenant_id=None,
115 db=None, db_lock=None, ovim=None):
116 """Init a thread.
117 Arguments:
118 'id' number of thead
119 'name' name of thread
120 'host','user': host ip or name to manage and user
121 'db', 'db_lock': database class and lock to use it in exclusion
122 """
123 threading.Thread.__init__(self)
124 self.plugins = plugins
125 self.vim = None
126 self.error_status = None
127 self.datacenter_name = datacenter_name
128 self.datacenter_tenant_id = datacenter_tenant_id
129 self.ovim = ovim
130 if not name:
131 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
132 else:
133 self.name = name
134 self.vim_persistent_info = {}
135 self.my_id = self.name[:64]
136
137 self.logger = logging.getLogger('openmano.vim.' + self.name)
138 self.db = db
139 self.db_lock = db_lock
140
141 self.task_lock = task_lock
142 self.task_queue = queue.Queue(2000)
143
144 def get_vimconnector(self):
145 try:
146 from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
147 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
148 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
149 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
150 'user', 'passwd', 'dt.config as dt_config')
151 where_ = {"dt.uuid": self.datacenter_tenant_id}
152 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
153 vim = vims[0]
154 vim_config = {}
155 if vim["config"]:
156 vim_config.update(yaml.load(vim["config"], Loader=yaml.Loader))
157 if vim["dt_config"]:
158 vim_config.update(yaml.load(vim["dt_config"], Loader=yaml.Loader))
159 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
160 vim_config['datacenter_id'] = vim.get('datacenter_id')
161
162 # get port_mapping
163 with self.db_lock:
164 vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings(
165 db_filter={"region": vim_config['datacenter_id'], "pci": None})
166
167 self.vim = self.plugins["rovim_" + vim["type"]].vimconnector(
168 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
169 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
170 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
171 user=vim['user'], passwd=vim['passwd'],
172 config=vim_config, persistent_info=self.vim_persistent_info
173 )
174 self.error_status = None
175 except Exception as e:
176 self.logger.error("Cannot load vimconnector for vim_account {}: {}".format(self.datacenter_tenant_id, e))
177 self.vim = None
178 self.error_status = "Error loading vimconnector: {}".format(e)
179
180 def _get_db_task(self):
181 """
182 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
183 :return: None
184 """
185 now = time.time()
186 try:
187 database_limit = 20
188 task_related = None
189 while True:
190 # get 20 (database_limit) entries each time
191 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
192 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
193 "status": ['SCHEDULED', 'BUILD', 'DONE'],
194 "worker": [None, self.my_id], "modified_at<=": now
195 },
196 ORDER_BY=("modified_at", "created_at",),
197 LIMIT=database_limit)
198 if not vim_actions:
199 return None, None
200 # if vim_actions[0]["modified_at"] > now:
201 # return int(vim_actions[0] - now)
202 for task in vim_actions:
203 # block related task
204 if task_related == task["related"]:
205 continue # ignore if a locking has already tried for these task set
206 task_related = task["related"]
207 # lock ...
208 self.db.update_rows("vim_wim_actions", UPDATE={"worker": self.my_id}, modified_time=0,
209 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
210 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
211 "worker": [None, self.my_id],
212 "related": task_related,
213 "item": task["item"],
214 })
215 # ... and read all related and check if locked
216 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
217 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
218 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
219 "related": task_related,
220 "item": task["item"],
221 },
222 ORDER_BY=("created_at",))
223 # check that all related tasks have been locked. If not release and try again. It can happen
224 # for race conditions if a new related task has been inserted by nfvo in the process
225 some_tasks_locked = False
226 some_tasks_not_locked = False
227 creation_task = None
228 for relate_task in related_tasks:
229 if relate_task["worker"] != self.my_id:
230 some_tasks_not_locked = True
231 else:
232 some_tasks_locked = True
233 if not creation_task and relate_task["action"] in ("CREATE", "FIND"):
234 creation_task = relate_task
235 if some_tasks_not_locked:
236 if some_tasks_locked: # unlock
237 self.db.update_rows("vim_wim_actions", UPDATE={"worker": None}, modified_time=0,
238 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
239 "worker": self.my_id,
240 "related": task_related,
241 "item": task["item"],
242 })
243 continue
244
245 # task of creation must be the first in the list of related_task
246 assert(related_tasks[0]["action"] in ("CREATE", "FIND"))
247
248 task["params"] = None
249 if task["extra"]:
250 extra = yaml.load(task["extra"], Loader=yaml.Loader)
251 else:
252 extra = {}
253 task["extra"] = extra
254 if extra.get("depends_on"):
255 task["depends"] = {}
256 if extra.get("params"):
257 task["params"] = deepcopy(extra["params"])
258 return task, related_tasks
259 except Exception as e:
260 self.logger.critical("Unexpected exception at _get_db_task: " + str(e), exc_info=True)
261 return None, None
262
263 def _delete_task(self, task):
264 """
265 Determine if this task need to be done or superseded
266 :return: None
267 """
268
269 def copy_extra_created(copy_to, copy_from):
270 copy_to["created"] = copy_from["created"]
271 if copy_from.get("sdn_net_id"):
272 copy_to["sdn_net_id"] = copy_from["sdn_net_id"]
273 if copy_from.get("interfaces"):
274 copy_to["interfaces"] = copy_from["interfaces"]
275 if copy_from.get("created_items"):
276 if not copy_to.get("created_items"):
277 copy_to["created_items"] = {}
278 copy_to["created_items"].update(copy_from["created_items"])
279
280 task_create = None
281 dependency_task = None
282 deletion_needed = False
283 if task["status"] == "FAILED":
284 return # TODO need to be retry??
285 try:
286 # get all related tasks
287 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
288 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
289 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
290 "action": ["FIND", "CREATE"],
291 "related": task["related"],
292 },
293 ORDER_BY=("created_at",),
294 )
295 for related_task in related_tasks:
296 if related_task["item"] == task["item"] and related_task["item_id"] == task["item_id"]:
297 task_create = related_task
298 # TASK_CREATE
299 if related_task["extra"]:
300 extra_created = yaml.load(related_task["extra"], Loader=yaml.Loader)
301 if extra_created.get("created"):
302 deletion_needed = True
303 related_task["extra"] = extra_created
304 elif not dependency_task:
305 dependency_task = related_task
306 if task_create and dependency_task:
307 break
308
309 # mark task_create as FINISHED
310 self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"},
311 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
312 "instance_action_id": task_create["instance_action_id"],
313 "task_index": task_create["task_index"]
314 })
315 if not deletion_needed:
316 return
317 elif dependency_task:
318 # move create information from task_create to relate_task
319 extra_new_created = yaml.load(dependency_task["extra"], Loader=yaml.Loader) or {}
320 extra_new_created["created"] = extra_created["created"]
321 copy_extra_created(copy_to=extra_new_created, copy_from=extra_created)
322
323 self.db.update_rows("vim_wim_actions",
324 UPDATE={"extra": yaml.safe_dump(extra_new_created, default_flow_style=True,
325 width=256),
326 "vim_id": task_create.get("vim_id")},
327 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
328 "instance_action_id": dependency_task["instance_action_id"],
329 "task_index": dependency_task["task_index"]
330 })
331 return False
332 else:
333 task["vim_id"] = task_create["vim_id"]
334 copy_extra_created(copy_to=task["extra"], copy_from=task_create["extra"])
335 return True
336
337 except Exception as e:
338 self.logger.critical("Unexpected exception at _delete_task: " + str(e), exc_info=True)
339
340 def _refres_vm(self, task):
341 """Call VIM to get VMs status"""
342 database_update = None
343
344 vim_id = task["vim_id"]
345 vm_to_refresh_list = [vim_id]
346 try:
347 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
348 vim_info = vim_dict[vim_id]
349 except vimconn.vimconnException as e:
350 # Mark all tasks at VIM_ERROR status
351 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
352 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
353
354 task_id = task["instance_action_id"] + "." + str(task["task_index"])
355 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
356
357 # check and update interfaces
358 task_warning_msg = ""
359 for interface in vim_info.get("interfaces", ()):
360 vim_interface_id = interface["vim_interface_id"]
361 if vim_interface_id not in task["extra"]["interfaces"]:
362 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
363 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
364 continue
365 task_interface = task["extra"]["interfaces"][vim_interface_id]
366 task_vim_interface = task_interface.get("vim_info")
367 if task_vim_interface != interface:
368 # delete old port
369 if task_interface.get("sdn_port_id"):
370 try:
371 with self.db_lock:
372 self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
373 task_interface["sdn_port_id"] = None
374 except ovimException as e:
375 error_text = "ovimException deleting external_port={}: {}".format(
376 task_interface["sdn_port_id"], e)
377 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
378 task_warning_msg += error_text
379 # TODO Set error_msg at instance_nets instead of instance VMs
380
381 # Create SDN port
382 sdn_net_id = task_interface.get("sdn_net_id")
383 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
384 sdn_port_name = sdn_net_id + "." + task["vim_id"]
385 sdn_port_name = sdn_port_name[:63]
386 try:
387 with self.db_lock:
388 sdn_port_id = self.ovim.new_external_port(
389 {"compute_node": interface["compute_node"],
390 "pci": interface["pci"],
391 "vlan": interface.get("vlan"),
392 "net_id": sdn_net_id,
393 "region": self.vim["config"]["datacenter_id"],
394 "name": sdn_port_name,
395 "mac": interface.get("mac_address")})
396 task_interface["sdn_port_id"] = sdn_port_id
397 except (ovimException, Exception) as e:
398 error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
399 format(interface["compute_node"], interface["pci"], interface.get("vlan"), e)
400 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
401 task_warning_msg += error_text
402 # TODO Set error_msg at instance_nets instead of instance VMs
403
404 self.db.update_rows('instance_interfaces',
405 UPDATE={"mac_address": interface.get("mac_address"),
406 "ip_address": interface.get("ip_address"),
407 "vim_interface_id": interface.get("vim_interface_id"),
408 "vim_info": interface.get("vim_info"),
409 "sdn_port_id": task_interface.get("sdn_port_id"),
410 "compute_node": interface.get("compute_node"),
411 "pci": interface.get("pci"),
412 "vlan": interface.get("vlan")},
413 WHERE={'uuid': task_interface["iface_id"]})
414 task_interface["vim_info"] = interface
415
416 # check and update task and instance_vms database
417 vim_info_error_msg = None
418 if vim_info.get("error_msg"):
419 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
420 elif task_warning_msg:
421 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
422 task_vim_info = task["extra"].get("vim_info")
423 task_error_msg = task.get("error_msg")
424 task_vim_status = task["extra"].get("vim_status")
425 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
426 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
427 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
428 if vim_info.get("vim_info"):
429 database_update["vim_info"] = vim_info["vim_info"]
430
431 task["extra"]["vim_status"] = vim_info["status"]
432 task["error_msg"] = vim_info_error_msg
433 if vim_info.get("vim_info"):
434 task["extra"]["vim_info"] = vim_info["vim_info"]
435
436 return database_update
437
438 def _refres_net(self, task):
439 """Call VIM to get network status"""
440 database_update = None
441
442 vim_id = task["vim_id"]
443 net_to_refresh_list = [vim_id]
444 try:
445 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
446 vim_info = vim_dict[vim_id]
447 except vimconn.vimconnException as e:
448 # Mark all tasks at VIM_ERROR status
449 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
450 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
451
452 task_id = task["instance_action_id"] + "." + str(task["task_index"])
453 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
454
455 task_vim_info = task["extra"].get("vim_info")
456 task_vim_status = task["extra"].get("vim_status")
457 task_error_msg = task.get("error_msg")
458 task_sdn_net_id = task["extra"].get("sdn_net_id")
459
460 vim_info_status = vim_info["status"]
461 vim_info_error_msg = vim_info.get("error_msg")
462 # get ovim status
463 if task_sdn_net_id:
464 try:
465 with self.db_lock:
466 sdn_net = self.ovim.show_network(task_sdn_net_id)
467 except (ovimException, Exception) as e:
468 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
469 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
470 sdn_net = {"status": "ERROR", "last_error": text_error}
471 if sdn_net["status"] == "ERROR":
472 if not vim_info_error_msg:
473 vim_info_error_msg = str(sdn_net.get("last_error"))
474 else:
475 vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
476 self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
477 self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
478 vim_info_status = "ERROR"
479 elif sdn_net["status"] == "BUILD":
480 if vim_info_status == "ACTIVE":
481 vim_info_status = "BUILD"
482
483 # update database
484 if vim_info_error_msg:
485 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
486 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
487 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
488 task["extra"]["vim_status"] = vim_info_status
489 task["error_msg"] = vim_info_error_msg
490 if vim_info.get("vim_info"):
491 task["extra"]["vim_info"] = vim_info["vim_info"]
492 database_update = {"status": vim_info_status, "error_msg": vim_info_error_msg}
493 if vim_info.get("vim_info"):
494 database_update["vim_info"] = vim_info["vim_info"]
495 return database_update
496
497 def _proccess_pending_tasks(self, task, related_tasks):
498 old_task_status = task["status"]
499 create_or_find = False # if as result of processing this task something is created or found
500 next_refresh = 0
501
502 try:
503 if task["status"] == "SCHEDULED":
504 # check if tasks that this depends on have been completed
505 dependency_not_completed = False
506 dependency_modified_at = 0
507 for task_index in task["extra"].get("depends_on", ()):
508 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
509 if not task_dependency:
510 raise VimThreadException(
511 "Cannot get depending net task trying to get depending task {}.{}".format(
512 task["instance_action_id"], task_index))
513 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
514 # database must be look again
515 if task_dependency["status"] == "SCHEDULED":
516 dependency_not_completed = True
517 dependency_modified_at = task_dependency["modified_at"]
518 break
519 elif task_dependency["status"] == "FAILED":
520 raise VimThreadException(
521 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
522 task["action"], task["item"],
523 task["instance_action_id"], task["task_index"],
524 task_dependency["instance_action_id"], task_dependency["task_index"],
525 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
526
527 task["depends"]["TASK-"+str(task_index)] = task_dependency
528 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], task_index)] = task_dependency
529 if dependency_not_completed:
530 # Move this task to the time dependency is going to be modified plus 10 seconds.
531 self.db.update_rows("vim_wim_actions", modified_time=dependency_modified_at + 10,
532 UPDATE={"worker": None},
533 WHERE={"datacenter_vim_id": self.datacenter_tenant_id, "worker": self.my_id,
534 "related": task["related"],
535 })
536 # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
537 # if task["extra"]["tries"] > 3:
538 # raise VimThreadException(
539 # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
540 # "(task {}.{})".format(task["action"], task["item"],
541 # task["instance_action_id"], task["task_index"],
542 # task_dependency["instance_action_id"], task_dependency["task_index"]
543 # task_dependency["action"], task_dependency["item"]))
544 return
545
546 database_update = None
547 if task["action"] == "DELETE":
548 deleted_needed = self._delete_task(task)
549 if not deleted_needed:
550 task["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
551 task["error_msg"] = None
552
553 if task["status"] == "SUPERSEDED":
554 # not needed to do anything but update database with the new status
555 database_update = None
556 elif not self.vim:
557 task["status"] = "FAILED"
558 task["error_msg"] = self.error_status
559 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
560 elif task["item_id"] != related_tasks[0]["item_id"] and task["action"] in ("FIND", "CREATE"):
561 # Do nothing, just copy values from one to another and updata database
562 task["status"] = related_tasks[0]["status"]
563 task["error_msg"] = related_tasks[0]["error_msg"]
564 task["vim_id"] = related_tasks[0]["vim_id"]
565 extra = yaml.load(related_tasks[0]["extra"], Loader=yaml.Loader)
566 task["extra"]["vim_status"] = extra.get("vim_status")
567 next_refresh = related_tasks[0]["modified_at"] + 0.001
568 database_update = {"status": task["extra"].get("vim_status", "VIM_ERROR"),
569 "error_msg": task["error_msg"]}
570 if task["item"] == 'instance_vms':
571 database_update["vim_vm_id"] = task["vim_id"]
572 elif task["item"] == 'instance_nets':
573 database_update["vim_net_id"] = task["vim_id"]
574 elif task["item"] == 'instance_vms':
575 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
576 database_update = self._refres_vm(task)
577 create_or_find = True
578 elif task["action"] == "CREATE":
579 create_or_find = True
580 database_update = self.new_vm(task)
581 elif task["action"] == "DELETE":
582 self.del_vm(task)
583 else:
584 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
585 elif task["item"] == 'instance_nets':
586 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
587 database_update = self._refres_net(task)
588 create_or_find = True
589 elif task["action"] == "CREATE":
590 create_or_find = True
591 database_update = self.new_net(task)
592 elif task["action"] == "DELETE":
593 self.del_net(task)
594 elif task["action"] == "FIND":
595 database_update = self.get_net(task)
596 else:
597 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
598 elif task["item"] == 'instance_sfis':
599 if task["action"] == "CREATE":
600 create_or_find = True
601 database_update = self.new_sfi(task)
602 elif task["action"] == "DELETE":
603 self.del_sfi(task)
604 else:
605 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
606 elif task["item"] == 'instance_sfs':
607 if task["action"] == "CREATE":
608 create_or_find = True
609 database_update = self.new_sf(task)
610 elif task["action"] == "DELETE":
611 self.del_sf(task)
612 else:
613 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
614 elif task["item"] == 'instance_classifications':
615 if task["action"] == "CREATE":
616 create_or_find = True
617 database_update = self.new_classification(task)
618 elif task["action"] == "DELETE":
619 self.del_classification(task)
620 else:
621 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
622 elif task["item"] == 'instance_sfps':
623 if task["action"] == "CREATE":
624 create_or_find = True
625 database_update = self.new_sfp(task)
626 elif task["action"] == "DELETE":
627 self.del_sfp(task)
628 else:
629 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
630 else:
631 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
632 # TODO
633 except VimThreadException as e:
634 task["error_msg"] = str(e)
635 task["status"] = "FAILED"
636 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
637 if task["item"] == 'instance_vms':
638 database_update["vim_vm_id"] = None
639 elif task["item"] == 'instance_nets':
640 database_update["vim_net_id"] = None
641
642 task_id = task["instance_action_id"] + "." + str(task["task_index"])
643 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
644 task_id, task["item"], task["action"], task["status"],
645 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
646 try:
647 if not next_refresh:
648 if task["status"] == "DONE":
649 next_refresh = time.time()
650 if task["extra"].get("vim_status") == "BUILD":
651 next_refresh += self.REFRESH_BUILD
652 elif task["extra"].get("vim_status") in ("ERROR", "VIM_ERROR"):
653 next_refresh += self.REFRESH_ERROR
654 elif task["extra"].get("vim_status") == "DELETED":
655 next_refresh += self.REFRESH_DELETE
656 else:
657 next_refresh += self.REFRESH_ACTIVE
658 elif task["status"] == "FAILED":
659 next_refresh = time.time() + self.REFRESH_DELETE
660
661 if create_or_find:
662 # modify all related task with action FIND/CREATED non SCHEDULED
663 self.db.update_rows(
664 table="vim_wim_actions", modified_time=next_refresh + 0.001,
665 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
666 "error_msg": task["error_msg"],
667 },
668
669 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
670 "worker": self.my_id,
671 "action": ["FIND", "CREATE"],
672 "related": task["related"],
673 "status<>": "SCHEDULED",
674 })
675 # modify own task
676 self.db.update_rows(
677 table="vim_wim_actions", modified_time=next_refresh,
678 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
679 "error_msg": task["error_msg"],
680 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
681 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
682 # Unlock tasks
683 self.db.update_rows(
684 table="vim_wim_actions", modified_time=0,
685 UPDATE={"worker": None},
686 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
687 "worker": self.my_id,
688 "related": task["related"],
689 })
690
691 # Update table instance_actions
692 if old_task_status == "SCHEDULED" and task["status"] != old_task_status:
693 self.db.update_rows(
694 table="instance_actions",
695 UPDATE={("number_failed" if task["status"] == "FAILED" else "number_done"): {"INCREMENT": 1}},
696 WHERE={"uuid": task["instance_action_id"]})
697 if database_update:
698 where_filter = {"related": task["related"]}
699 if task["item"] == "instance_nets" and task["datacenter_vim_id"]:
700 where_filter["datacenter_tenant_id"] = task["datacenter_vim_id"]
701 self.db.update_rows(table=task["item"],
702 UPDATE=database_update,
703 WHERE=where_filter)
704 except db_base_Exception as e:
705 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
706
707 def insert_task(self, task):
708 try:
709 self.task_queue.put(task, False)
710 return None
711 except queue.Full:
712 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
713
714 def del_task(self, task):
715 with self.task_lock:
716 if task["status"] == "SCHEDULED":
717 task["status"] = "SUPERSEDED"
718 return True
719 else: # task["status"] == "processing"
720 self.task_lock.release()
721 return False
722
723 def run(self):
724 self.logger.debug("Starting")
725 while True:
726 self.get_vimconnector()
727 self.logger.debug("Vimconnector loaded")
728 reload_thread = False
729
730 while True:
731 try:
732 while not self.task_queue.empty():
733 task = self.task_queue.get()
734 if isinstance(task, list):
735 pass
736 elif isinstance(task, str):
737 if task == 'exit':
738 return 0
739 elif task == 'reload':
740 reload_thread = True
741 break
742 self.task_queue.task_done()
743 if reload_thread:
744 break
745
746 task, related_tasks = self._get_db_task()
747 if task:
748 self._proccess_pending_tasks(task, related_tasks)
749 else:
750 time.sleep(5)
751
752 except Exception as e:
753 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
754
755 self.logger.debug("Finishing")
756
757 def _look_for_task(self, instance_action_id, task_id):
758 """
759 Look for a concrete task at vim_actions database table
760 :param instance_action_id: The instance_action_id
761 :param task_id: Can have several formats:
762 <task index>: integer
763 TASK-<task index> :backward compatibility,
764 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
765 :return: Task dictionary or None if not found
766 """
767 if isinstance(task_id, int):
768 task_index = task_id
769 else:
770 if task_id.startswith("TASK-"):
771 task_id = task_id[5:]
772 ins_action_id, _, task_index = task_id.rpartition(".")
773 if ins_action_id:
774 instance_action_id = ins_action_id
775
776 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
777 "task_index": task_index})
778 if not tasks:
779 return None
780 task = tasks[0]
781 task["params"] = None
782 task["depends"] = {}
783 if task["extra"]:
784 extra = yaml.load(task["extra"], Loader=yaml.Loader)
785 task["extra"] = extra
786 task["params"] = extra.get("params")
787 else:
788 task["extra"] = {}
789 return task
790
791 @staticmethod
792 def _format_vim_error_msg(error_text, max_length=1024):
793 if error_text and len(error_text) >= max_length:
794 return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:]
795 return error_text
796
797 def new_vm(self, task):
798 task_id = task["instance_action_id"] + "." + str(task["task_index"])
799 try:
800 params = task["params"]
801 depends = task.get("depends")
802 net_list = params[5]
803 for net in net_list:
804 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
805 network_id = task["depends"][net["net_id"]].get("vim_id")
806 if not network_id:
807 raise VimThreadException(
808 "Cannot create VM because depends on a network not created or found: " +
809 str(depends[net["net_id"]]["error_msg"]))
810 net["net_id"] = network_id
811 params_copy = deepcopy(params)
812 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
813
814 # fill task_interfaces. Look for snd_net_id at database for each interface
815 task_interfaces = {}
816 for iface in params_copy[5]:
817 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
818 result = self.db.get_rows(
819 SELECT=('sdn_net_id', 'interface_id'),
820 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
821 WHERE={'ii.uuid': iface["uuid"]})
822 if result:
823 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
824 task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
825 else:
826 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
827 iface["uuid"]),
828 exc_info=True)
829
830 task["vim_info"] = {}
831 task["extra"]["interfaces"] = task_interfaces
832 task["extra"]["created"] = True
833 task["extra"]["created_items"] = created_items
834 task["extra"]["vim_status"] = "BUILD"
835 task["error_msg"] = None
836 task["status"] = "DONE"
837 task["vim_id"] = vim_vm_id
838 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
839 return instance_element_update
840
841 except (vimconn.vimconnException, VimThreadException) as e:
842 self.logger.error("task={} new-VM: {}".format(task_id, e))
843 error_text = self._format_vim_error_msg(str(e))
844 task["error_msg"] = error_text
845 task["status"] = "FAILED"
846 task["vim_id"] = None
847 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
848 return instance_element_update
849
850 def del_vm(self, task):
851 task_id = task["instance_action_id"] + "." + str(task["task_index"])
852 vm_vim_id = task["vim_id"]
853 interfaces = task["extra"].get("interfaces", ())
854 try:
855 for iface in interfaces.values():
856 if iface.get("sdn_port_id"):
857 try:
858 with self.db_lock:
859 self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
860 except ovimException as e:
861 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
862 task_id, iface["sdn_port_id"], e), exc_info=True)
863 # TODO Set error_msg at instance_nets
864
865 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
866 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
867 task["error_msg"] = None
868 return None
869
870 except vimconn.vimconnException as e:
871 task["error_msg"] = self._format_vim_error_msg(str(e))
872 if isinstance(e, vimconn.vimconnNotFoundException):
873 # If not found mark as Done and fill error_msg
874 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
875 return None
876 task["status"] = "FAILED"
877 return None
878
879 def _get_net_internal(self, task, filter_param):
880 """
881 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
882 :param task: task for this find or find-or-create action
883 :param filter_param: parameters to send to the vimconnector
884 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
885 when network is not found or found more than one
886 """
887 vim_nets = self.vim.get_network_list(filter_param)
888 if not vim_nets:
889 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
890 elif len(vim_nets) > 1:
891 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
892 vim_net_id = vim_nets[0]["id"]
893
894 # Discover if this network is managed by a sdn controller
895 sdn_net_id = None
896 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
897 WHERE={'vim_net_id': vim_net_id, 'datacenter_tenant_id': self.datacenter_tenant_id},
898 ORDER="instance_scenario_id")
899 if result:
900 sdn_net_id = result[0]['sdn_net_id']
901
902 task["status"] = "DONE"
903 task["extra"]["vim_info"] = {}
904 task["extra"]["created"] = False
905 task["extra"]["vim_status"] = "BUILD"
906 task["extra"]["sdn_net_id"] = sdn_net_id
907 task["error_msg"] = None
908 task["vim_id"] = vim_net_id
909 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
910 "error_msg": None, "sdn_net_id": sdn_net_id}
911 return instance_element_update
912
913 def get_net(self, task):
914 task_id = task["instance_action_id"] + "." + str(task["task_index"])
915 try:
916 params = task["params"]
917 filter_param = params[0]
918 instance_element_update = self._get_net_internal(task, filter_param)
919 return instance_element_update
920
921 except (vimconn.vimconnException, VimThreadException) as e:
922 self.logger.error("task={} get-net: {}".format(task_id, e))
923 task["status"] = "FAILED"
924 task["vim_id"] = None
925 task["error_msg"] = self._format_vim_error_msg(str(e))
926 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
927 "error_msg": task["error_msg"]}
928 return instance_element_update
929
930 def new_net(self, task):
931 vim_net_id = None
932 sdn_net_id = None
933 task_id = task["instance_action_id"] + "." + str(task["task_index"])
934 action_text = ""
935 try:
936 # FIND
937 if task["extra"].get("find"):
938 action_text = "finding"
939 filter_param = task["extra"]["find"][0]
940 try:
941 instance_element_update = self._get_net_internal(task, filter_param)
942 return instance_element_update
943 except VimThreadExceptionNotFound:
944 pass
945 # CREATE
946 params = task["params"]
947 action_text = "creating VIM"
948 vim_net_id, created_items = self.vim.new_network(*params[0:3])
949
950 net_name = params[0]
951 net_type = params[1]
952 wim_account_name = None
953 if len(params) >= 4:
954 wim_account_name = params[3]
955
956 sdn_controller = self.vim.config.get('sdn-controller')
957 if sdn_controller and (net_type == "data" or net_type == "ptp"):
958 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
959
960 vim_net = self.vim.get_network(vim_net_id)
961 if vim_net.get('encapsulation') != 'vlan':
962 raise vimconn.vimconnException(
963 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
964 net_name, net_type, vim_net['encapsulation']))
965 network["vlan"] = vim_net.get('segmentation_id')
966 action_text = "creating SDN"
967 with self.db_lock:
968 sdn_net_id = self.ovim.new_network(network)
969
970 if wim_account_name and self.vim.config["wim_external_ports"]:
971 # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
972 action_text = "attaching external port to ovim network"
973 sdn_port_name = "external_port"
974 sdn_port_data = {
975 "compute_node": "__WIM:" + wim_account_name[0:58],
976 "pci": None,
977 "vlan": network["vlan"],
978 "net_id": sdn_net_id,
979 "region": self.vim["config"]["datacenter_id"],
980 "name": sdn_port_name,
981 }
982 try:
983 with self.db_lock:
984 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
985 except ovimException:
986 sdn_port_data["compute_node"] = "__WIM"
987 with self.db_lock:
988 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
989 self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
990 sdn_net_id))
991 task["status"] = "DONE"
992 task["extra"]["vim_info"] = {}
993 task["extra"]["sdn_net_id"] = sdn_net_id
994 task["extra"]["vim_status"] = "BUILD"
995 task["extra"]["created"] = True
996 task["extra"]["created_items"] = created_items
997 task["error_msg"] = None
998 task["vim_id"] = vim_net_id
999 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
1000 "created": True, "error_msg": None}
1001 return instance_element_update
1002 except (vimconn.vimconnException, ovimException) as e:
1003 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1004 task["status"] = "FAILED"
1005 task["vim_id"] = vim_net_id
1006 task["error_msg"] = self._format_vim_error_msg(str(e))
1007 task["extra"]["sdn_net_id"] = sdn_net_id
1008 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
1009 "error_msg": task["error_msg"]}
1010 return instance_element_update
1011
1012 def del_net(self, task):
1013 net_vim_id = task["vim_id"]
1014 sdn_net_id = task["extra"].get("sdn_net_id")
1015 try:
1016 if net_vim_id:
1017 self.vim.delete_network(net_vim_id, task["extra"].get("created_items"))
1018 if sdn_net_id:
1019 # Delete any attached port to this sdn network. There can be ports associated to this network in case
1020 # it was manually done using 'openmano vim-net-sdn-attach'
1021 with self.db_lock:
1022 port_list = self.ovim.get_ports(columns={'uuid'},
1023 filter={'name': 'external_port', 'net_id': sdn_net_id})
1024 for port in port_list:
1025 self.ovim.delete_port(port['uuid'], idempotent=True)
1026 self.ovim.delete_network(sdn_net_id, idempotent=True)
1027 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1028 task["error_msg"] = None
1029 return None
1030 except ovimException as e:
1031 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
1032 "ports for net {}: {}".format(sdn_net_id, str(e)))
1033 except vimconn.vimconnException as e:
1034 task["error_msg"] = self._format_vim_error_msg(str(e))
1035 if isinstance(e, vimconn.vimconnNotFoundException):
1036 # If not found mark as Done and fill error_msg
1037 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1038 return None
1039 task["status"] = "FAILED"
1040 return None
1041
1042 # Service Function Instances
1043 def new_sfi(self, task):
1044 vim_sfi_id = None
1045 try:
1046 # Waits for interfaces to be ready (avoids failure)
1047 time.sleep(1)
1048 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1049 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1050 error_text = ""
1051 interfaces = task["depends"][dep_id]["extra"].get("interfaces")
1052
1053 ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id")
1054 egress_interface_id = task.get("extra").get("params").get("egress_interface_id")
1055 ingress_vim_interface_id = None
1056 egress_vim_interface_id = None
1057 for vim_interface, interface_data in interfaces.items():
1058 if interface_data.get("interface_id") == ingress_interface_id:
1059 ingress_vim_interface_id = vim_interface
1060 break
1061 if ingress_interface_id != egress_interface_id:
1062 for vim_interface, interface_data in interfaces.items():
1063 if interface_data.get("interface_id") == egress_interface_id:
1064 egress_vim_interface_id = vim_interface
1065 break
1066 else:
1067 egress_vim_interface_id = ingress_vim_interface_id
1068 if not ingress_vim_interface_id or not egress_vim_interface_id:
1069 error_text = "Error creating Service Function Instance, Ingress: {}, Egress: {}".format(
1070 ingress_vim_interface_id, egress_vim_interface_id)
1071 self.logger.error(error_text)
1072 task["error_msg"] = error_text
1073 task["status"] = "FAILED"
1074 task["vim_id"] = None
1075 return None
1076 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1077 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack,
1078 # only the first ingress and first egress ports will be used to create the SFI (Port Pair).
1079 ingress_port_id_list = [ingress_vim_interface_id]
1080 egress_port_id_list = [egress_vim_interface_id]
1081 name = "sfi-{}".format(task["item_id"][:8])
1082 # By default no form of IETF SFC Encapsulation will be used
1083 vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False)
1084
1085 task["extra"]["created"] = True
1086 task["extra"]["vim_status"] = "ACTIVE"
1087 task["error_msg"] = None
1088 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1089 task["vim_id"] = vim_sfi_id
1090 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1091 return instance_element_update
1092
1093 except (vimconn.vimconnException, VimThreadException) as e:
1094 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1095 error_text = self._format_vim_error_msg(str(e))
1096 task["error_msg"] = error_text
1097 task["status"] = "FAILED"
1098 task["vim_id"] = None
1099 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1100 return instance_element_update
1101
1102 def del_sfi(self, task):
1103 sfi_vim_id = task["vim_id"]
1104 try:
1105 self.vim.delete_sfi(sfi_vim_id)
1106 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1107 task["error_msg"] = None
1108 return None
1109
1110 except vimconn.vimconnException as e:
1111 task["error_msg"] = self._format_vim_error_msg(str(e))
1112 if isinstance(e, vimconn.vimconnNotFoundException):
1113 # If not found mark as Done and fill error_msg
1114 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1115 return None
1116 task["status"] = "FAILED"
1117 return None
1118
1119 def new_sf(self, task):
1120 vim_sf_id = None
1121 try:
1122 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1123 error_text = ""
1124 depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]]
1125 # sfis = next(iter(task.get("depends").values())).get("extra").get("params")[5]
1126 sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks]
1127 sfi_id_list = []
1128 for sfi in sfis:
1129 sfi_id_list.append(sfi.get("vim_id"))
1130 name = "sf-{}".format(task["item_id"][:8])
1131 # By default no form of IETF SFC Encapsulation will be used
1132 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1133
1134 task["extra"]["created"] = True
1135 task["extra"]["vim_status"] = "ACTIVE"
1136 task["error_msg"] = None
1137 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1138 task["vim_id"] = vim_sf_id
1139 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1140 return instance_element_update
1141
1142 except (vimconn.vimconnException, VimThreadException) as e:
1143 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1144 error_text = self._format_vim_error_msg(str(e))
1145 task["error_msg"] = error_text
1146 task["status"] = "FAILED"
1147 task["vim_id"] = None
1148 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1149 return instance_element_update
1150
1151 def del_sf(self, task):
1152 sf_vim_id = task["vim_id"]
1153 try:
1154 self.vim.delete_sf(sf_vim_id)
1155 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1156 task["error_msg"] = None
1157 return None
1158
1159 except vimconn.vimconnException as e:
1160 task["error_msg"] = self._format_vim_error_msg(str(e))
1161 if isinstance(e, vimconn.vimconnNotFoundException):
1162 # If not found mark as Done and fill error_msg
1163 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1164 return None
1165 task["status"] = "FAILED"
1166 return None
1167
1168 def new_classification(self, task):
1169 vim_classification_id = None
1170 try:
1171 params = task["params"]
1172 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1173 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1174 error_text = ""
1175 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces").keys()
1176 # Bear in mind that different VIM connectors might support Classifications differently.
1177 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1178 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1179 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1180 # using the IPv4 flow classifier.
1181 name = "c-{}".format(task["item_id"][:8])
1182 # if not CIDR is given for the IP addresses, add /32:
1183 ip_proto = int(params.get("ip_proto"))
1184 source_ip = params.get("source_ip")
1185 destination_ip = params.get("destination_ip")
1186 source_port = params.get("source_port")
1187 destination_port = params.get("destination_port")
1188 definition = {"logical_source_port": interfaces[0]}
1189 if ip_proto:
1190 if ip_proto == 1:
1191 ip_proto = 'icmp'
1192 elif ip_proto == 6:
1193 ip_proto = 'tcp'
1194 elif ip_proto == 17:
1195 ip_proto = 'udp'
1196 definition["protocol"] = ip_proto
1197 if source_ip:
1198 if '/' not in source_ip:
1199 source_ip += '/32'
1200 definition["source_ip_prefix"] = source_ip
1201 if source_port:
1202 definition["source_port_range_min"] = source_port
1203 definition["source_port_range_max"] = source_port
1204 if destination_port:
1205 definition["destination_port_range_min"] = destination_port
1206 definition["destination_port_range_max"] = destination_port
1207 if destination_ip:
1208 if '/' not in destination_ip:
1209 destination_ip += '/32'
1210 definition["destination_ip_prefix"] = destination_ip
1211
1212 vim_classification_id = self.vim.new_classification(
1213 name, 'legacy_flow_classifier', definition)
1214
1215 task["extra"]["created"] = True
1216 task["extra"]["vim_status"] = "ACTIVE"
1217 task["error_msg"] = None
1218 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1219 task["vim_id"] = vim_classification_id
1220 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id,
1221 "error_msg": None}
1222 return instance_element_update
1223
1224 except (vimconn.vimconnException, VimThreadException) as e:
1225 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1226 error_text = self._format_vim_error_msg(str(e))
1227 task["error_msg"] = error_text
1228 task["status"] = "FAILED"
1229 task["vim_id"] = None
1230 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1231 return instance_element_update
1232
1233 def del_classification(self, task):
1234 classification_vim_id = task["vim_id"]
1235 try:
1236 self.vim.delete_classification(classification_vim_id)
1237 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1238 task["error_msg"] = None
1239 return None
1240
1241 except vimconn.vimconnException as e:
1242 task["error_msg"] = self._format_vim_error_msg(str(e))
1243 if isinstance(e, vimconn.vimconnNotFoundException):
1244 # If not found mark as Done and fill error_msg
1245 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1246 return None
1247 task["status"] = "FAILED"
1248 return None
1249
1250 def new_sfp(self, task):
1251 vim_sfp_id = None
1252 try:
1253 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1254 depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in
1255 task.get("extra").get("depends_on")]
1256 error_text = ""
1257 sf_id_list = []
1258 classification_id_list = []
1259 for dep in depending_tasks:
1260 vim_id = dep.get("vim_id")
1261 resource = dep.get("item")
1262 if resource == "instance_sfs":
1263 sf_id_list.append(vim_id)
1264 elif resource == "instance_classifications":
1265 classification_id_list.append(vim_id)
1266
1267 name = "sfp-{}".format(task["item_id"][:8])
1268 # By default no form of IETF SFC Encapsulation will be used
1269 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1270
1271 task["extra"]["created"] = True
1272 task["extra"]["vim_status"] = "ACTIVE"
1273 task["error_msg"] = None
1274 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1275 task["vim_id"] = vim_sfp_id
1276 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1277 return instance_element_update
1278
1279 except (vimconn.vimconnException, VimThreadException) as e:
1280 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1281 error_text = self._format_vim_error_msg(str(e))
1282 task["error_msg"] = error_text
1283 task["status"] = "FAILED"
1284 task["vim_id"] = None
1285 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1286 return instance_element_update
1287
1288 def del_sfp(self, task):
1289 sfp_vim_id = task["vim_id"]
1290 try:
1291 self.vim.delete_sfp(sfp_vim_id)
1292 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1293 task["error_msg"] = None
1294 return None
1295
1296 except vimconn.vimconnException as e:
1297 task["error_msg"] = self._format_vim_error_msg(str(e))
1298 if isinstance(e, vimconn.vimconnNotFoundException):
1299 # If not found mark as Done and fill error_msg
1300 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1301 return None
1302 task["status"] = "FAILED"
1303 return None