Implement feature 5949
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 The task content is (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This together with the previous forms a unique key identifier
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the previous table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
40 can contain an int (single index on the same instance-action) or str (compete action ID)
41 sdn_net_id: used for net.
42 tries:
43 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
44 iface_id: uuid of intance_interfaces
45 sdn_port_id:
46 sdn_net_id:
47 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
48 created: False if the VIM element is not created by other actions, and it should not be deleted
49 vim_status: VIM status of the element. Stored also at database in the instance_XXX
50 M depends: dict with task_index(from depends_on) to task class
51 M params: same as extra[params] but with the resolved dependencies
52 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_wim_actions
53 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_wim_actions
54 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
55 MD created_at: task creation time
56 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
57
58 """
59
60 import threading
61 import time
62 import Queue
63 import logging
64 import vimconn
65 import vimconn_openvim
66 import vimconn_aws
67 import vimconn_opennebula
68 import vimconn_openstack
69 import vimconn_vmware
70 import yaml
71 from db_base import db_base_Exception
72 from lib_osm_openvim.ovim import ovimException
73 from copy import deepcopy
74
75 __author__ = "Alfonso Tierno, Pablo Montes"
76 __date__ = "$28-Sep-2017 12:07:15$"
77
78 vim_module = {
79 "openvim": vimconn_openvim,
80 "aws": vimconn_aws,
81 "opennebula": vimconn_opennebula,
82 "openstack": vimconn_openstack,
83 "vmware": vimconn_vmware,
84 }
85
86 def is_task_id(task_id):
87 return task_id.startswith("TASK-")
88
89
90 class VimThreadException(Exception):
91 pass
92
93
94 class VimThreadExceptionNotFound(VimThreadException):
95 pass
96
97
98 class vim_thread(threading.Thread):
99 REFRESH_BUILD = 5 # 5 seconds
100 REFRESH_ACTIVE = 60 # 1 minute
101
102 def __init__(self, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
103 db=None, db_lock=None, ovim=None):
104 """Init a thread.
105 Arguments:
106 'id' number of thead
107 'name' name of thread
108 'host','user': host ip or name to manage and user
109 'db', 'db_lock': database class and lock to use it in exclusion
110 """
111 threading.Thread.__init__(self)
112 self.vim = None
113 self.error_status = None
114 self.datacenter_name = datacenter_name
115 self.datacenter_tenant_id = datacenter_tenant_id
116 self.ovim = ovim
117 if not name:
118 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
119 else:
120 self.name = name
121 self.vim_persistent_info = {}
122
123 self.logger = logging.getLogger('openmano.vim.'+self.name)
124 self.db = db
125 self.db_lock = db_lock
126
127 self.task_lock = task_lock
128 self.task_queue = Queue.Queue(2000)
129
130 self.refresh_tasks = []
131 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
132
133 self.pending_tasks = []
134 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
135
136 self.grouped_tasks = {}
137 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
138 <item><item_id>:
139 - <task1> # e.g. CREATE task
140 <task2> # e.g. DELETE task
141 """
142
143 def get_vimconnector(self):
144 try:
145 from_= "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
146 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
147 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
148 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
149 'user', 'passwd', 'dt.config as dt_config')
150 where_ = {"dt.uuid": self.datacenter_tenant_id}
151 with self.db_lock:
152 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
153 vim = vims[0]
154 vim_config = {}
155 if vim["config"]:
156 vim_config.update(yaml.load(vim["config"]))
157 if vim["dt_config"]:
158 vim_config.update(yaml.load(vim["dt_config"]))
159 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
160 vim_config['datacenter_id'] = vim.get('datacenter_id')
161
162 self.vim = vim_module[vim["type"]].vimconnector(
163 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
164 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
165 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
166 user=vim['user'], passwd=vim['passwd'],
167 config=vim_config, persistent_info=self.vim_persistent_info
168 )
169 self.error_status = None
170 except Exception as e:
171 self.logger.error("Cannot load vimconnector for vim_account {}: {}".format(self.datacenter_tenant_id, e))
172 self.vim = None
173 self.error_status = "Error loading vimconnector: {}".format(e)
174
175 def _reload_vim_actions(self):
176 """
177 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
178 :return: None
179 """
180 try:
181 action_completed = False
182 task_list = []
183 old_action_key = None
184
185 old_item_id = ""
186 old_item = ""
187 old_created_at = 0.0
188 database_limit = 200
189 while True:
190 # get 200 (database_limit) entries each time
191 with self.db_lock:
192 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
193 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
194 "item_id>=": old_item_id},
195 ORDER_BY=("item_id", "item", "created_at",),
196 LIMIT=database_limit)
197 for task in vim_actions:
198 item = task["item"]
199 item_id = task["item_id"]
200
201 # skip the first entries that are already processed in the previous pool of 200
202 if old_item_id:
203 if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
204 old_item_id = False # next one will be a new un-processed task
205 continue
206
207 action_key = item + item_id
208 if old_action_key != action_key:
209 if not action_completed and task_list:
210 # This will fill needed task parameters into memory, and insert the task if needed in
211 # self.pending_tasks or self.refresh_tasks
212 try:
213 self._insert_pending_tasks(task_list)
214 except Exception as e:
215 self.logger.critical(
216 "Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
217 exc_info=True)
218 task_list = []
219 old_action_key = action_key
220 action_completed = False
221 elif action_completed:
222 continue
223
224 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
225 task_list.append(task)
226 elif task["action"] == "DELETE":
227 # action completed because deleted and status is not SCHEDULED. Not needed anything
228 action_completed = True
229 if len(vim_actions) == database_limit:
230 # update variables for get the next database iteration
231 old_item_id = item_id
232 old_item = item
233 old_created_at = task["created_at"]
234 else:
235 break
236 # Last actions group need to be inserted too
237 if not action_completed and task_list:
238 try:
239 self._insert_pending_tasks(task_list)
240 except Exception as e:
241 self.logger.critical("Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
242 exc_info=True)
243 self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
244 len(self.pending_tasks), len(self.refresh_tasks)))
245 except Exception as e:
246 self.logger.critical("Unexpected exception at _reload_vim_actions: " + str(e), exc_info=True)
247
248 def _refres_elements(self):
249 """Call VIM to get VMs and networks status until 10 elements"""
250 now = time.time()
251 nb_processed = 0
252 vm_to_refresh_list = []
253 net_to_refresh_list = []
254 vm_to_refresh_dict = {}
255 net_to_refresh_dict = {}
256 items_to_refresh = 0
257 while self.refresh_tasks:
258 task = self.refresh_tasks[0]
259 with self.task_lock:
260 if task['status'] == 'SUPERSEDED':
261 self.refresh_tasks.pop(0)
262 continue
263 if task['modified_at'] > now:
264 break
265 # task["status"] = "processing"
266 nb_processed += 1
267 self.refresh_tasks.pop(0)
268 if task["item"] == 'instance_vms':
269 if task["vim_id"] not in vm_to_refresh_dict:
270 vm_to_refresh_dict[task["vim_id"]] = [task]
271 vm_to_refresh_list.append(task["vim_id"])
272 else:
273 vm_to_refresh_dict[task["vim_id"]].append(task)
274 elif task["item"] == 'instance_nets':
275 if task["vim_id"] not in net_to_refresh_dict:
276 net_to_refresh_dict[task["vim_id"]] = [task]
277 net_to_refresh_list.append(task["vim_id"])
278 else:
279 net_to_refresh_dict[task["vim_id"]].append(task)
280 else:
281 task_id = task["instance_action_id"] + "." + str(task["task_index"])
282 self.logger.critical("task={}: unknown task {}".format(task_id, task["item"]), exc_info=True)
283 items_to_refresh += 1
284 if items_to_refresh == 10:
285 break
286
287 if vm_to_refresh_list:
288 now = time.time()
289 try:
290 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
291 except vimconn.vimconnException as e:
292 # Mark all tasks at VIM_ERROR status
293 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
294 vim_dict = {}
295 for vim_id in vm_to_refresh_list:
296 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
297
298 for vim_id, vim_info in vim_dict.items():
299
300 # look for task
301 for task in vm_to_refresh_dict[vim_id]:
302 task_need_update = False
303 task_id = task["instance_action_id"] + "." + str(task["task_index"])
304 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
305
306 # check and update interfaces
307 task_warning_msg = ""
308 for interface in vim_info.get("interfaces", ()):
309 vim_interface_id = interface["vim_interface_id"]
310 if vim_interface_id not in task["extra"]["interfaces"]:
311 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
312 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
313 continue
314 task_interface = task["extra"]["interfaces"][vim_interface_id]
315 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
316 if task_vim_interface != interface:
317 # delete old port
318 if task_interface.get("sdn_port_id"):
319 try:
320 with self.db_lock:
321 self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
322 task_interface["sdn_port_id"] = None
323 task_need_update = True
324 except ovimException as e:
325 error_text = "ovimException deleting external_port={}: {}".format(
326 task_interface["sdn_port_id"], e)
327 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
328 task_warning_msg += error_text
329 # TODO Set error_msg at instance_nets instead of instance VMs
330
331 # Create SDN port
332 sdn_net_id = task_interface.get("sdn_net_id")
333 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
334 sdn_port_name = sdn_net_id + "." + task["vim_id"]
335 sdn_port_name = sdn_port_name[:63]
336 try:
337 with self.db_lock:
338 sdn_port_id = self.ovim.new_external_port(
339 {"compute_node": interface["compute_node"],
340 "pci": interface["pci"],
341 "vlan": interface.get("vlan"),
342 "net_id": sdn_net_id,
343 "region": self.vim["config"]["datacenter_id"],
344 "name": sdn_port_name,
345 "mac": interface.get("mac_address")})
346 task_interface["sdn_port_id"] = sdn_port_id
347 task_need_update = True
348 except (ovimException, Exception) as e:
349 error_text = "ovimException creating new_external_port compute_node={}"\
350 " pci={} vlan={} {}".format(
351 interface["compute_node"],
352 interface["pci"],
353 interface.get("vlan"), e)
354 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
355 task_warning_msg += error_text
356 # TODO Set error_msg at instance_nets instead of instance VMs
357
358 with self.db_lock:
359 self.db.update_rows(
360 'instance_interfaces',
361 UPDATE={"mac_address": interface.get("mac_address"),
362 "ip_address": interface.get("ip_address"),
363 "vim_info": interface.get("vim_info"),
364 "sdn_port_id": task_interface.get("sdn_port_id"),
365 "compute_node": interface.get("compute_node"),
366 "pci": interface.get("pci"),
367 "vlan": interface.get("vlan")},
368 WHERE={'uuid': task_interface["iface_id"]})
369 task["vim_interfaces"][vim_interface_id] = interface
370
371 # check and update task and instance_vms database
372 vim_info_error_msg = None
373 if vim_info.get("error_msg"):
374 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
375 elif task_warning_msg:
376 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
377 task_vim_info = task.get("vim_info")
378 task_error_msg = task.get("error_msg")
379 task_vim_status = task["extra"].get("vim_status")
380 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
381 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
382 temp_dict = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
383 if vim_info.get("vim_info"):
384 temp_dict["vim_info"] = vim_info["vim_info"]
385 with self.db_lock:
386 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
387 task["extra"]["vim_status"] = vim_info["status"]
388 task["error_msg"] = vim_info_error_msg
389 if vim_info.get("vim_info"):
390 task["vim_info"] = vim_info["vim_info"]
391 task_need_update = True
392
393 if task_need_update:
394 with self.db_lock:
395 self.db.update_rows(
396 'vim_wim_actions',
397 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
398 "error_msg": task.get("error_msg"), "modified_at": now},
399 WHERE={'instance_action_id': task['instance_action_id'],
400 'task_index': task['task_index']})
401 if task["extra"].get("vim_status") == "BUILD":
402 self._insert_refresh(task, now + self.REFRESH_BUILD)
403 else:
404 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
405
406 if net_to_refresh_list:
407 now = time.time()
408 try:
409 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
410 except vimconn.vimconnException as e:
411 # Mark all tasks at VIM_ERROR status
412 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
413 vim_dict = {}
414 for vim_id in net_to_refresh_list:
415 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
416
417 for vim_id, vim_info in vim_dict.items():
418 # look for task
419 for task in net_to_refresh_dict[vim_id]:
420 task_id = task["instance_action_id"] + "." + str(task["task_index"])
421 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
422
423 task_vim_info = task.get("vim_info")
424 task_vim_status = task["extra"].get("vim_status")
425 task_error_msg = task.get("error_msg")
426 task_sdn_net_id = task["extra"].get("sdn_net_id")
427
428 vim_info_status = vim_info["status"]
429 vim_info_error_msg = vim_info.get("error_msg")
430 # get ovim status
431 if task_sdn_net_id:
432 try:
433 with self.db_lock:
434 sdn_net = self.ovim.show_network(task_sdn_net_id)
435 except (ovimException, Exception) as e:
436 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
437 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
438 sdn_net = {"status": "ERROR", "last_error": text_error}
439 if sdn_net["status"] == "ERROR":
440 if not vim_info_error_msg:
441 vim_info_error_msg = str(sdn_net.get("last_error"))
442 else:
443 vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
444 self._format_vim_error_msg(vim_info_error_msg, 1024//2-14),
445 self._format_vim_error_msg(sdn_net["last_error"], 1024//2-14))
446 vim_info_status = "ERROR"
447 elif sdn_net["status"] == "BUILD":
448 if vim_info_status == "ACTIVE":
449 vim_info_status = "BUILD"
450
451 # update database
452 if vim_info_error_msg:
453 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
454 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
455 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
456 task["extra"]["vim_status"] = vim_info_status
457 task["error_msg"] = vim_info_error_msg
458 if vim_info.get("vim_info"):
459 task["vim_info"] = vim_info["vim_info"]
460 temp_dict = {"status": vim_info_status, "error_msg": vim_info_error_msg}
461 if vim_info.get("vim_info"):
462 temp_dict["vim_info"] = vim_info["vim_info"]
463 with self.db_lock:
464 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
465 self.db.update_rows(
466 'vim_wim_actions',
467 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
468 "error_msg": task.get("error_msg"), "modified_at": now},
469 WHERE={'instance_action_id': task['instance_action_id'],
470 'task_index': task['task_index']})
471 if task["extra"].get("vim_status") == "BUILD":
472 self._insert_refresh(task, now + self.REFRESH_BUILD)
473 else:
474 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
475
476 return nb_processed
477
478 def _insert_refresh(self, task, threshold_time=None):
479 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
480 It is assumed that this is called inside this thread
481 """
482 if not self.vim:
483 return
484 if not threshold_time:
485 threshold_time = time.time()
486 task["modified_at"] = threshold_time
487 task_name = task["item"][9:] + "-" + task["action"]
488 task_id = task["instance_action_id"] + "." + str(task["task_index"])
489 for index in range(0, len(self.refresh_tasks)):
490 if self.refresh_tasks[index]["modified_at"] > threshold_time:
491 self.refresh_tasks.insert(index, task)
492 break
493 else:
494 index = len(self.refresh_tasks)
495 self.refresh_tasks.append(task)
496 self.logger.debug("task={} new refresh name={}, modified_at={} index={}".format(
497 task_id, task_name, task["modified_at"], index))
498
499 def _remove_refresh(self, task_name, vim_id):
500 """Remove a task with this name and vim_id from the list of refreshing elements.
501 It is assumed that this is called inside this thread outside _refres_elements method
502 Return True if self.refresh_list is modified, task is found
503 Return False if not found
504 """
505 index_to_delete = None
506 for index in range(0, len(self.refresh_tasks)):
507 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
508 index_to_delete = index
509 break
510 else:
511 return False
512 if not index_to_delete:
513 del self.refresh_tasks[index_to_delete]
514 return True
515
516 def _proccess_pending_tasks(self):
517 nb_created = 0
518 nb_processed = 0
519 while self.pending_tasks:
520 task = self.pending_tasks.pop(0)
521 nb_processed += 1
522 try:
523 # check if tasks that this depends on have been completed
524 dependency_not_completed = False
525 for task_index in task["extra"].get("depends_on", ()):
526 task_dependency = task["depends"].get("TASK-" + str(task_index))
527 if not task_dependency:
528 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
529 if not task_dependency:
530 raise VimThreadException(
531 "Cannot get depending net task trying to get depending task {}.{}".format(
532 task["instance_action_id"], task_index))
533 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
534 if task_dependency["status"] == "SCHEDULED":
535 dependency_not_completed = True
536 break
537 elif task_dependency["status"] == "FAILED":
538 raise VimThreadException(
539 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
540 task["action"], task["item"],
541 task["instance_action_id"], task["task_index"],
542 task_dependency["instance_action_id"], task_dependency["task_index"],
543 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
544 if dependency_not_completed:
545 # Move this task to the end.
546 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
547 if task["extra"]["tries"] <= 3:
548 self.pending_tasks.append(task)
549 continue
550 else:
551 raise VimThreadException(
552 "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
553 "(task {}.{})".format(task["action"], task["item"],
554 task["instance_action_id"], task["task_index"],
555 task_dependency["instance_action_id"], task_dependency["task_index"],
556 task_dependency["action"], task_dependency["item"]))
557
558 if task["status"] == "SUPERSEDED":
559 # not needed to do anything but update database with the new status
560 result = True
561 database_update = None
562 elif not self.vim:
563 task["status"] = "ERROR"
564 task["error_msg"] = self.error_status
565 result = False
566 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
567 elif task["item"] == 'instance_vms':
568 if task["action"] == "CREATE":
569 result, database_update = self.new_vm(task)
570 nb_created += 1
571 elif task["action"] == "DELETE":
572 result, database_update = self.del_vm(task)
573 else:
574 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
575 elif task["item"] == 'instance_nets':
576 if task["action"] == "CREATE":
577 result, database_update = self.new_net(task)
578 nb_created += 1
579 elif task["action"] == "DELETE":
580 result, database_update = self.del_net(task)
581 elif task["action"] == "FIND":
582 result, database_update = self.get_net(task)
583 else:
584 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
585 elif task["item"] == 'instance_sfis':
586 if task["action"] == "CREATE":
587 result, database_update = self.new_sfi(task)
588 nb_created += 1
589 elif task["action"] == "DELETE":
590 result, database_update = self.del_sfi(task)
591 else:
592 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
593 elif task["item"] == 'instance_sfs':
594 if task["action"] == "CREATE":
595 result, database_update = self.new_sf(task)
596 nb_created += 1
597 elif task["action"] == "DELETE":
598 result, database_update = self.del_sf(task)
599 else:
600 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
601 elif task["item"] == 'instance_classifications':
602 if task["action"] == "CREATE":
603 result, database_update = self.new_classification(task)
604 nb_created += 1
605 elif task["action"] == "DELETE":
606 result, database_update = self.del_classification(task)
607 else:
608 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
609 elif task["item"] == 'instance_sfps':
610 if task["action"] == "CREATE":
611 result, database_update = self.new_sfp(task)
612 nb_created += 1
613 elif task["action"] == "DELETE":
614 result, database_update = self.del_sfp(task)
615 else:
616 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
617 else:
618 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
619 # TODO
620 except VimThreadException as e:
621 result = False
622 task["error_msg"] = str(e)
623 task["status"] = "FAILED"
624 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
625 if task["item"] == 'instance_vms':
626 database_update["vim_vm_id"] = None
627 elif task["item"] == 'instance_nets':
628 database_update["vim_net_id"] = None
629
630 no_refresh_tasks = ['instance_sfis', 'instance_sfs',
631 'instance_classifications', 'instance_sfps']
632 if task["action"] == "DELETE":
633 action_key = task["item"] + task["item_id"]
634 del self.grouped_tasks[action_key]
635 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
636 if task["item"] not in no_refresh_tasks:
637 self._insert_refresh(task)
638
639 task_id = task["instance_action_id"] + "." + str(task["task_index"])
640 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
641 task_id, task["item"], task["action"], task["status"],
642 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
643 try:
644 now = time.time()
645 with self.db_lock:
646 self.db.update_rows(
647 table="vim_wim_actions",
648 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
649 "error_msg": task["error_msg"],
650 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
651 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
652 if result is not None:
653 self.db.update_rows(
654 table="instance_actions",
655 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
656 "modified_at": now},
657 WHERE={"uuid": task["instance_action_id"]})
658 if database_update:
659 self.db.update_rows(table=task["item"],
660 UPDATE=database_update,
661 WHERE={"uuid": task["item_id"]})
662 except db_base_Exception as e:
663 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
664
665 if nb_created == 10:
666 break
667 return nb_processed
668
669 def _insert_pending_tasks(self, vim_actions_list):
670 for task in vim_actions_list:
671 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
672 continue
673 item = task["item"]
674 item_id = task["item_id"]
675 action_key = item + item_id
676 if action_key not in self.grouped_tasks:
677 self.grouped_tasks[action_key] = []
678 task["params"] = None
679 task["depends"] = {}
680 if task["extra"]:
681 extra = yaml.load(task["extra"])
682 task["extra"] = extra
683 task["params"] = extra.get("params")
684 depends_on_list = extra.get("depends_on")
685 if depends_on_list:
686 for dependency_task in depends_on_list:
687 if isinstance(dependency_task, int):
688 index = dependency_task
689 else:
690 instance_action_id, _, task_id = dependency_task.rpartition(".")
691 if instance_action_id != task["instance_action_id"]:
692 continue
693 index = int(task_id)
694
695 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\
696 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
697 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
698 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], index)] = vim_actions_list[index]
699 if extra.get("interfaces"):
700 task["vim_interfaces"] = {}
701 else:
702 task["extra"] = {}
703 if "error_msg" not in task:
704 task["error_msg"] = None
705 if "vim_id" not in task:
706 task["vim_id"] = None
707
708 if task["action"] == "DELETE":
709 need_delete_action = False
710 for to_supersede in self.grouped_tasks.get(action_key, ()):
711 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
712 task["vim_id"] = to_supersede["vim_id"]
713 if to_supersede["action"] == "CREATE" and to_supersede["extra"].get("created", True) and \
714 (to_supersede.get("vim_id") or to_supersede["extra"].get("sdn_net_id")):
715 need_delete_action = True
716 task["vim_id"] = to_supersede["vim_id"]
717 if to_supersede["extra"].get("sdn_net_id"):
718 task["extra"]["sdn_net_id"] = to_supersede["extra"]["sdn_net_id"]
719 if to_supersede["extra"].get("interfaces"):
720 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
721 if to_supersede["extra"].get("created_items"):
722 if not task["extra"].get("created_items"):
723 task["extra"]["created_items"] = {}
724 task["extra"]["created_items"].update(to_supersede["extra"]["created_items"])
725 # Mark task as SUPERSEDED.
726 # If task is in self.pending_tasks, it will be removed and database will be update
727 # If task is in self.refresh_tasks, it will be removed
728 to_supersede["status"] = "SUPERSEDED"
729 if not need_delete_action:
730 task["status"] = "SUPERSEDED"
731
732 self.grouped_tasks[action_key].append(task)
733 self.pending_tasks.append(task)
734 elif task["status"] == "SCHEDULED":
735 self.grouped_tasks[action_key].append(task)
736 self.pending_tasks.append(task)
737 elif task["action"] in ("CREATE", "FIND"):
738 self.grouped_tasks[action_key].append(task)
739 if task["status"] in ("DONE", "BUILD"):
740 self._insert_refresh(task)
741 # TODO add VM reset, get console, etc...
742 else:
743 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
744
745 def insert_task(self, task):
746 try:
747 self.task_queue.put(task, False)
748 return None
749 except Queue.Full:
750 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
751
752 def del_task(self, task):
753 with self.task_lock:
754 if task["status"] == "SCHEDULED":
755 task["status"] = "SUPERSEDED"
756 return True
757 else: # task["status"] == "processing"
758 self.task_lock.release()
759 return False
760
761 def run(self):
762 self.logger.debug("Starting")
763 while True:
764 self.get_vimconnector()
765 self.logger.debug("Vimconnector loaded")
766 self._reload_vim_actions()
767 reload_thread = False
768
769 while True:
770 try:
771 while not self.task_queue.empty():
772 task = self.task_queue.get()
773 if isinstance(task, list):
774 self._insert_pending_tasks(task)
775 elif isinstance(task, str):
776 if task == 'exit':
777 return 0
778 elif task == 'reload':
779 reload_thread = True
780 break
781 self.task_queue.task_done()
782 if reload_thread:
783 break
784 nb_processed = self._proccess_pending_tasks()
785 nb_processed += self._refres_elements()
786 if not nb_processed:
787 time.sleep(1)
788
789 except Exception as e:
790 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
791
792 self.logger.debug("Finishing")
793
794 def _look_for_task(self, instance_action_id, task_id):
795 """
796 Look for a concrete task at vim_actions database table
797 :param instance_action_id: The instance_action_id
798 :param task_id: Can have several formats:
799 <task index>: integer
800 TASK-<task index> :backward compatibility,
801 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
802 :return: Task dictionary or None if not found
803 """
804 if isinstance(task_id, int):
805 task_index = task_id
806 else:
807 if task_id.startswith("TASK-"):
808 task_id = task_id[5:]
809 ins_action_id, _, task_index = task_id.rpartition(".")
810 if ins_action_id:
811 instance_action_id = ins_action_id
812
813 with self.db_lock:
814 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
815 "task_index": task_index})
816 if not tasks:
817 return None
818 task = tasks[0]
819 task["params"] = None
820 task["depends"] = {}
821 if task["extra"]:
822 extra = yaml.load(task["extra"])
823 task["extra"] = extra
824 task["params"] = extra.get("params")
825 if extra.get("interfaces"):
826 task["vim_interfaces"] = {}
827 else:
828 task["extra"] = {}
829 return task
830
831 @staticmethod
832 def _format_vim_error_msg(error_text, max_length=1024):
833 if error_text and len(error_text) >= max_length:
834 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
835 return error_text
836
837 def new_vm(self, task):
838 task_id = task["instance_action_id"] + "." + str(task["task_index"])
839 try:
840 params = task["params"]
841 depends = task.get("depends")
842 net_list = params[5]
843 for net in net_list:
844 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
845 task_dependency = task["depends"].get(net["net_id"])
846 if not task_dependency:
847 task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
848 if not task_dependency:
849 raise VimThreadException(
850 "Cannot get depending net task trying to get depending task {}.{}".format(
851 task["instance_action_id"], net["net_id"]))
852 network_id = task_dependency.get("vim_id")
853 if not network_id:
854 raise VimThreadException(
855 "Cannot create VM because depends on a network not created or found: " +
856 str(depends[net["net_id"]]["error_msg"]))
857 net["net_id"] = network_id
858 params_copy = deepcopy(params)
859 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
860
861 # fill task_interfaces. Look for snd_net_id at database for each interface
862 task_interfaces = {}
863 for iface in params_copy[5]:
864 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
865 with self.db_lock:
866 result = self.db.get_rows(
867 SELECT=('sdn_net_id',),
868 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
869 WHERE={'ii.uuid': iface["uuid"]})
870 if result:
871 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
872 else:
873 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
874 iface["uuid"]), exc_info=True)
875
876 task["vim_info"] = {}
877 task["vim_interfaces"] = {}
878 task["extra"]["interfaces"] = task_interfaces
879 task["extra"]["created"] = True
880 task["extra"]["created_items"] = created_items
881 task["error_msg"] = None
882 task["status"] = "DONE"
883 task["vim_id"] = vim_vm_id
884 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
885 return True, instance_element_update
886
887 except (vimconn.vimconnException, VimThreadException) as e:
888 self.logger.error("task={} new-VM: {}".format(task_id, e))
889 error_text = self._format_vim_error_msg(str(e))
890 task["error_msg"] = error_text
891 task["status"] = "FAILED"
892 task["vim_id"] = None
893 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
894 return False, instance_element_update
895
896 def del_vm(self, task):
897 task_id = task["instance_action_id"] + "." + str(task["task_index"])
898 vm_vim_id = task["vim_id"]
899 interfaces = task["extra"].get("interfaces", ())
900 try:
901 for iface in interfaces.values():
902 if iface.get("sdn_port_id"):
903 try:
904 with self.db_lock:
905 self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
906 except ovimException as e:
907 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
908 task_id, iface["sdn_port_id"], e), exc_info=True)
909 # TODO Set error_msg at instance_nets
910
911 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
912 task["status"] = "DONE"
913 task["error_msg"] = None
914 return True, None
915
916 except vimconn.vimconnException as e:
917 task["error_msg"] = self._format_vim_error_msg(str(e))
918 if isinstance(e, vimconn.vimconnNotFoundException):
919 # If not found mark as Done and fill error_msg
920 task["status"] = "DONE"
921 return True, None
922 task["status"] = "FAILED"
923 return False, None
924
925 def _get_net_internal(self, task, filter_param):
926 """
927 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
928 :param task: task for this find or find-or-create action
929 :param filter_param: parameters to send to the vimconnector
930 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
931 when network is not found or found more than one
932 """
933 vim_nets = self.vim.get_network_list(filter_param)
934 if not vim_nets:
935 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
936 elif len(vim_nets) > 1:
937 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
938 vim_net_id = vim_nets[0]["id"]
939
940 # Discover if this network is managed by a sdn controller
941 sdn_net_id = None
942 with self.db_lock:
943 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
944 WHERE={'vim_net_id': vim_net_id,
945 'datacenter_tenant_id': self.datacenter_tenant_id},
946 ORDER="instance_scenario_id")
947 if result:
948 sdn_net_id = result[0]['sdn_net_id']
949
950 task["status"] = "DONE"
951 task["extra"]["vim_info"] = {}
952 task["extra"]["created"] = False
953 task["extra"]["sdn_net_id"] = sdn_net_id
954 task["error_msg"] = None
955 task["vim_id"] = vim_net_id
956 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
957 "error_msg": None, "sdn_net_id": sdn_net_id}
958 return instance_element_update
959
960 def get_net(self, task):
961 task_id = task["instance_action_id"] + "." + str(task["task_index"])
962 try:
963 params = task["params"]
964 filter_param = params[0]
965 instance_element_update = self._get_net_internal(task, filter_param)
966 return True, instance_element_update
967
968 except (vimconn.vimconnException, VimThreadException) as e:
969 self.logger.error("task={} get-net: {}".format(task_id, e))
970 task["status"] = "FAILED"
971 task["vim_id"] = None
972 task["error_msg"] = self._format_vim_error_msg(str(e))
973 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
974 "error_msg": task["error_msg"]}
975 return False, instance_element_update
976
977 def new_net(self, task):
978 vim_net_id = None
979 sdn_net_id = None
980 task_id = task["instance_action_id"] + "." + str(task["task_index"])
981 action_text = ""
982 try:
983 # FIND
984 if task["extra"].get("find"):
985 action_text = "finding"
986 filter_param = task["extra"]["find"][0]
987 try:
988 instance_element_update = self._get_net_internal(task, filter_param)
989 return True, instance_element_update
990 except VimThreadExceptionNotFound:
991 pass
992 # CREATE
993 params = task["params"]
994 action_text = "creating VIM"
995 vim_net_id = self.vim.new_network(*params)
996
997 net_name = params[0]
998 net_type = params[1]
999
1000 sdn_controller = self.vim.config.get('sdn-controller')
1001 if sdn_controller and (net_type == "data" or net_type == "ptp"):
1002 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
1003
1004 vim_net = self.vim.get_network(vim_net_id)
1005 if vim_net.get('encapsulation') != 'vlan':
1006 raise vimconn.vimconnException(
1007 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
1008 net_name, net_type, vim_net['encapsulation']))
1009 network["vlan"] = vim_net.get('segmentation_id')
1010 action_text = "creating SDN"
1011 with self.db_lock:
1012 sdn_net_id = self.ovim.new_network(network)
1013 task["status"] = "DONE"
1014 task["extra"]["vim_info"] = {}
1015 task["extra"]["sdn_net_id"] = sdn_net_id
1016 task["extra"]["created"] = True
1017 task["error_msg"] = None
1018 task["vim_id"] = vim_net_id
1019 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
1020 "created": True, "error_msg": None}
1021 return True, instance_element_update
1022 except (vimconn.vimconnException, ovimException) as e:
1023 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1024 task["status"] = "FAILED"
1025 task["vim_id"] = vim_net_id
1026 task["error_msg"] = self._format_vim_error_msg(str(e))
1027 task["extra"]["sdn_net_id"] = sdn_net_id
1028 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
1029 "error_msg": task["error_msg"]}
1030 return False, instance_element_update
1031
1032 def del_net(self, task):
1033 net_vim_id = task["vim_id"]
1034 sdn_net_id = task["extra"].get("sdn_net_id")
1035 try:
1036 if sdn_net_id:
1037 # Delete any attached port to this sdn network. There can be ports associated to this network in case
1038 # it was manually done using 'openmano vim-net-sdn-attach'
1039 with self.db_lock:
1040 port_list = self.ovim.get_ports(columns={'uuid'},
1041 filter={'name': 'external_port', 'net_id': sdn_net_id})
1042 for port in port_list:
1043 self.ovim.delete_port(port['uuid'], idempotent=True)
1044 self.ovim.delete_network(sdn_net_id, idempotent=True)
1045 if net_vim_id:
1046 self.vim.delete_network(net_vim_id)
1047 task["status"] = "DONE"
1048 task["error_msg"] = None
1049 return True, None
1050 except ovimException as e:
1051 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
1052 "ports for net {}: {}".format(sdn_net_id, str(e)))
1053 except vimconn.vimconnException as e:
1054 task["error_msg"] = self._format_vim_error_msg(str(e))
1055 if isinstance(e, vimconn.vimconnNotFoundException):
1056 # If not found mark as Done and fill error_msg
1057 task["status"] = "DONE"
1058 return True, None
1059 task["status"] = "FAILED"
1060 return False, None
1061
1062 ## Service Function Instances
1063
1064 def new_sfi(self, task):
1065 vim_sfi_id = None
1066 try:
1067 params = task["params"]
1068 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1069 depends = task.get("depends")
1070 error_text = ""
1071 interfaces = task.get("depends").values()[0].get("extra").get("params")[5]
1072 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1073 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack, only the
1074 # first ingress and first egress ports will be used to create the SFI (Port Pair).
1075 port_id_list = [interfaces[0].get("vim_id")]
1076 name = "sfi-%s" % task["item_id"][:8]
1077 # By default no form of IETF SFC Encapsulation will be used
1078 vim_sfi_id = self.vim.new_sfi(name, port_id_list, port_id_list, sfc_encap=False)
1079
1080 task["extra"]["created"] = True
1081 task["error_msg"] = None
1082 task["status"] = "DONE"
1083 task["vim_id"] = vim_sfi_id
1084 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1085 return True, instance_element_update
1086
1087 except (vimconn.vimconnException, VimThreadException) as e:
1088 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1089 error_text = self._format_vim_error_msg(str(e))
1090 task["error_msg"] = error_text
1091 task["status"] = "FAILED"
1092 task["vim_id"] = None
1093 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1094 return False, instance_element_update
1095
1096 def del_sfi(self, task):
1097 sfi_vim_id = task["vim_id"]
1098 try:
1099 self.vim.delete_sfi(sfi_vim_id)
1100 task["status"] = "DONE"
1101 task["error_msg"] = None
1102 return True, None
1103
1104 except vimconn.vimconnException as e:
1105 task["error_msg"] = self._format_vim_error_msg(str(e))
1106 if isinstance(e, vimconn.vimconnNotFoundException):
1107 # If not found mark as Done and fill error_msg
1108 task["status"] = "DONE"
1109 return True, None
1110 task["status"] = "FAILED"
1111 return False, None
1112
1113 def new_sf(self, task):
1114 vim_sf_id = None
1115 try:
1116 params = task["params"]
1117 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1118 depends = task.get("depends")
1119 error_text = ""
1120 #sfis = task.get("depends").values()[0].get("extra").get("params")[5]
1121 sfis = task.get("depends").values()
1122 sfi_id_list = []
1123 for sfi in sfis:
1124 sfi_id_list.append(sfi.get("vim_id"))
1125 name = "sf-%s" % task["item_id"][:8]
1126 # By default no form of IETF SFC Encapsulation will be used
1127 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1128
1129 task["extra"]["created"] = True
1130 task["error_msg"] = None
1131 task["status"] = "DONE"
1132 task["vim_id"] = vim_sf_id
1133 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1134 return True, instance_element_update
1135
1136 except (vimconn.vimconnException, VimThreadException) as e:
1137 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1138 error_text = self._format_vim_error_msg(str(e))
1139 task["error_msg"] = error_text
1140 task["status"] = "FAILED"
1141 task["vim_id"] = None
1142 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1143 return False, instance_element_update
1144
1145 def del_sf(self, task):
1146 sf_vim_id = task["vim_id"]
1147 try:
1148 self.vim.delete_sf(sf_vim_id)
1149 task["status"] = "DONE"
1150 task["error_msg"] = None
1151 return True, None
1152
1153 except vimconn.vimconnException as e:
1154 task["error_msg"] = self._format_vim_error_msg(str(e))
1155 if isinstance(e, vimconn.vimconnNotFoundException):
1156 # If not found mark as Done and fill error_msg
1157 task["status"] = "DONE"
1158 return True, None
1159 task["status"] = "FAILED"
1160 return False, None
1161
1162 def new_classification(self, task):
1163 vim_classification_id = None
1164 try:
1165 params = task["params"]
1166 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1167 depends = task.get("depends")
1168 error_text = ""
1169 interfaces = task.get("depends").values()[0].get("extra").get("params")[5]
1170 # Bear in mind that different VIM connectors might support Classifications differently.
1171 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1172 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1173 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1174 # using the IPv4 flow classifier.
1175 name = "c-%s" % task["item_id"][:8]
1176 # if not CIDR is given for the IP addresses, add /32:
1177 ip_proto = int(params.get("ip_proto"))
1178 source_ip = params.get("source_ip")
1179 destination_ip = params.get("destination_ip")
1180 if ip_proto == 1:
1181 ip_proto = 'icmp'
1182 elif ip_proto == 6:
1183 ip_proto = 'tcp'
1184 elif ip_proto == 17:
1185 ip_proto = 'udp'
1186 if '/' not in source_ip:
1187 source_ip += '/32'
1188 if '/' not in destination_ip:
1189 destination_ip += '/32'
1190 definition = {
1191 "logical_source_port": interfaces[0].get("vim_id"),
1192 "protocol": ip_proto,
1193 "source_ip_prefix": source_ip,
1194 "destination_ip_prefix": destination_ip,
1195 "source_port_range_min": params.get("source_port"),
1196 "source_port_range_max": params.get("source_port"),
1197 "destination_port_range_min": params.get("destination_port"),
1198 "destination_port_range_max": params.get("destination_port"),
1199 }
1200
1201 vim_classification_id = self.vim.new_classification(
1202 name, 'legacy_flow_classifier', definition)
1203
1204 task["extra"]["created"] = True
1205 task["error_msg"] = None
1206 task["status"] = "DONE"
1207 task["vim_id"] = vim_classification_id
1208 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id, "error_msg": None}
1209 return True, instance_element_update
1210
1211 except (vimconn.vimconnException, VimThreadException) as e:
1212 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1213 error_text = self._format_vim_error_msg(str(e))
1214 task["error_msg"] = error_text
1215 task["status"] = "FAILED"
1216 task["vim_id"] = None
1217 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1218 return False, instance_element_update
1219
1220 def del_classification(self, task):
1221 classification_vim_id = task["vim_id"]
1222 try:
1223 self.vim.delete_classification(classification_vim_id)
1224 task["status"] = "DONE"
1225 task["error_msg"] = None
1226 return True, None
1227
1228 except vimconn.vimconnException as e:
1229 task["error_msg"] = self._format_vim_error_msg(str(e))
1230 if isinstance(e, vimconn.vimconnNotFoundException):
1231 # If not found mark as Done and fill error_msg
1232 task["status"] = "DONE"
1233 return True, None
1234 task["status"] = "FAILED"
1235 return False, None
1236
1237 def new_sfp(self, task):
1238 vim_sfp_id = None
1239 try:
1240 params = task["params"]
1241 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1242 depends = task.get("depends")
1243 error_text = ""
1244 deps = task.get("depends").values()
1245 sf_id_list = []
1246 classification_id_list = []
1247 for dep in deps:
1248 vim_id = dep.get("vim_id")
1249 resource = dep.get("item")
1250 if resource == "instance_sfs":
1251 sf_id_list.append(vim_id)
1252 elif resource == "instance_classifications":
1253 classification_id_list.append(vim_id)
1254
1255 name = "sfp-%s" % task["item_id"][:8]
1256 # By default no form of IETF SFC Encapsulation will be used
1257 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1258
1259 task["extra"]["created"] = True
1260 task["error_msg"] = None
1261 task["status"] = "DONE"
1262 task["vim_id"] = vim_sfp_id
1263 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1264 return True, instance_element_update
1265
1266 except (vimconn.vimconnException, VimThreadException) as e:
1267 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1268 error_text = self._format_vim_error_msg(str(e))
1269 task["error_msg"] = error_text
1270 task["status"] = "FAILED"
1271 task["vim_id"] = None
1272 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1273 return False, instance_element_update
1274 return
1275
1276 def del_sfp(self, task):
1277 sfp_vim_id = task["vim_id"]
1278 try:
1279 self.vim.delete_sfp(sfp_vim_id)
1280 task["status"] = "DONE"
1281 task["error_msg"] = None
1282 return True, None
1283
1284 except vimconn.vimconnException as e:
1285 task["error_msg"] = self._format_vim_error_msg(str(e))
1286 if isinstance(e, vimconn.vimconnNotFoundException):
1287 # If not found mark as Done and fill error_msg
1288 task["status"] = "DONE"
1289 return True, None
1290 task["status"] = "FAILED"
1291 return False, None