Fix problem at vim_thread reload when vim_account edited
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_actions
27 The task content is (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This together with the previous forms a unique key identifier
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the previous table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
40 can contain an int (single index on the same instance-action) or str (compete action ID)
41 sdn_net_id: used for net.
42 tries:
43 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
44 iface_id: uuid of intance_interfaces
45 sdn_port_id:
46 sdn_net_id:
47 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
48 created: False if the VIM element is not created by other actions, and it should not be deleted
49 vim_status: VIM status of the element. Stored also at database in the instance_XXX
50 M depends: dict with task_index(from depends_on) to task class
51 M params: same as extra[params] but with the resolved dependencies
52 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_actions
53 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_actions
54 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
55 MD created_at: task creation time
56 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
57
58 """
59
60 import threading
61 import time
62 import Queue
63 import logging
64 import vimconn
65 import vimconn_openvim
66 import vimconn_aws
67 import vimconn_opennebula
68 import vimconn_openstack
69 import vimconn_vmware
70 import yaml
71 from db_base import db_base_Exception
72 from lib_osm_openvim.ovim import ovimException
73 from copy import deepcopy
74
75 __author__ = "Alfonso Tierno, Pablo Montes"
76 __date__ = "$28-Sep-2017 12:07:15$"
77
78 vim_module = {
79 "openvim": vimconn_openvim,
80 "aws": vimconn_aws,
81 "opennebula": vimconn_opennebula,
82 "openstack": vimconn_openstack,
83 "vmware": vimconn_vmware,
84 }
85
86 def is_task_id(task_id):
87 return task_id.startswith("TASK-")
88
89
90 class VimThreadException(Exception):
91 pass
92
93
94 class VimThreadExceptionNotFound(VimThreadException):
95 pass
96
97
98 class vim_thread(threading.Thread):
99 REFRESH_BUILD = 5 # 5 seconds
100 REFRESH_ACTIVE = 60 # 1 minute
101
102 def __init__(self, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
103 db=None, db_lock=None, ovim=None):
104 """Init a thread.
105 Arguments:
106 'id' number of thead
107 'name' name of thread
108 'host','user': host ip or name to manage and user
109 'db', 'db_lock': database class and lock to use it in exclusion
110 """
111 threading.Thread.__init__(self)
112 self.vim = None
113 self.error_status = None
114 self.datacenter_name = datacenter_name
115 self.datacenter_tenant_id = datacenter_tenant_id
116 self.ovim = ovim
117 if not name:
118 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
119 else:
120 self.name = name
121 self.vim_persistent_info = {}
122
123 self.logger = logging.getLogger('openmano.vim.'+self.name)
124 self.db = db
125 self.db_lock = db_lock
126
127 self.task_lock = task_lock
128 self.task_queue = Queue.Queue(2000)
129
130 self.refresh_tasks = []
131 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
132
133 self.pending_tasks = []
134 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
135
136 self.grouped_tasks = {}
137 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
138 <item><item_id>:
139 - <task1> # e.g. CREATE task
140 <task2> # e.g. DELETE task
141 """
142
143 def get_vimconnector(self):
144 try:
145 from_= "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
146 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
147 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
148 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
149 'user', 'passwd', 'dt.config as dt_config')
150 where_ = {"dt.uuid": self.datacenter_tenant_id}
151 with self.db_lock:
152 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
153 vim = vims[0]
154 extra = {'datacenter_tenant_id': vim.get('datacenter_tenant_id'),
155 'datacenter_id': vim.get('datacenter_id')}
156
157 self.vim = vim_module[vim["type"]].vimconnector(
158 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
159 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
160 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
161 user=vim['user'], passwd=vim['passwd'],
162 config=extra, persistent_info=self.vim_persistent_info
163 )
164 self.error_status = None
165 except Exception as e:
166 self.logger.error("Cannot load vimconnector for vim_account {}: {}".format(self.datacenter_tenant_id, e))
167 self.vim = None
168 self.error_status = "Error loading vimconnector: {}".format(e)
169
170 def _reload_vim_actions(self):
171 """
172 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
173 :return: None
174 """
175 try:
176 action_completed = False
177 task_list = []
178 old_action_key = None
179
180 old_item_id = ""
181 old_item = ""
182 old_created_at = 0.0
183 database_limit = 200
184 while True:
185 # get 200 (database_limit) entries each time
186 with self.db_lock:
187 vim_actions = self.db.get_rows(FROM="vim_actions",
188 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
189 "item_id>=": old_item_id},
190 ORDER_BY=("item_id", "item", "created_at",),
191 LIMIT=database_limit)
192 for task in vim_actions:
193 item = task["item"]
194 item_id = task["item_id"]
195
196 # skip the first entries that are already processed in the previous pool of 200
197 if old_item_id:
198 if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
199 old_item_id = False # next one will be a new un-processed task
200 continue
201
202 action_key = item + item_id
203 if old_action_key != action_key:
204 if not action_completed and task_list:
205 # This will fill needed task parameters into memory, and insert the task if needed in
206 # self.pending_tasks or self.refresh_tasks
207 try:
208 self._insert_pending_tasks(task_list)
209 except Exception as e:
210 self.logger.critical(
211 "Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
212 exc_info=True)
213 task_list = []
214 old_action_key = action_key
215 action_completed = False
216 elif action_completed:
217 continue
218
219 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
220 task_list.append(task)
221 elif task["action"] == "DELETE":
222 # action completed because deleted and status is not SCHEDULED. Not needed anything
223 action_completed = True
224 if len(vim_actions) == database_limit:
225 # update variables for get the next database iteration
226 old_item_id = item_id
227 old_item = item
228 old_created_at = task["created_at"]
229 else:
230 break
231 # Last actions group need to be inserted too
232 if not action_completed and task_list:
233 try:
234 self._insert_pending_tasks(task_list)
235 except Exception as e:
236 self.logger.critical("Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
237 exc_info=True)
238 self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
239 len(self.pending_tasks), len(self.refresh_tasks)))
240 except Exception as e:
241 self.logger.critical("Unexpected exception at _reload_vim_actions: " + str(e), exc_info=True)
242
243 def _refres_elements(self):
244 """Call VIM to get VMs and networks status until 10 elements"""
245 now = time.time()
246 nb_processed = 0
247 vm_to_refresh_list = []
248 net_to_refresh_list = []
249 vm_to_refresh_dict = {}
250 net_to_refresh_dict = {}
251 items_to_refresh = 0
252 while self.refresh_tasks:
253 task = self.refresh_tasks[0]
254 with self.task_lock:
255 if task['status'] == 'SUPERSEDED':
256 self.refresh_tasks.pop(0)
257 continue
258 if task['modified_at'] > now:
259 break
260 # task["status"] = "processing"
261 nb_processed += 1
262 self.refresh_tasks.pop(0)
263 if task["item"] == 'instance_vms':
264 if task["vim_id"] not in vm_to_refresh_dict:
265 vm_to_refresh_dict[task["vim_id"]] = [task]
266 vm_to_refresh_list.append(task["vim_id"])
267 else:
268 vm_to_refresh_dict[task["vim_id"]].append(task)
269 elif task["item"] == 'instance_nets':
270 if task["vim_id"] not in net_to_refresh_dict:
271 net_to_refresh_dict[task["vim_id"]] = [task]
272 net_to_refresh_list.append(task["vim_id"])
273 else:
274 net_to_refresh_dict[task["vim_id"]].append(task)
275 else:
276 task_id = task["instance_action_id"] + "." + str(task["task_index"])
277 self.logger.critical("task={}: unknown task {}".format(task_id, task["item"]), exc_info=True)
278 items_to_refresh += 1
279 if items_to_refresh == 10:
280 break
281
282 if vm_to_refresh_list:
283 now = time.time()
284 try:
285 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
286 except vimconn.vimconnException as e:
287 # Mark all tasks at VIM_ERROR status
288 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
289 vim_dict = {}
290 for vim_id in vm_to_refresh_list:
291 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
292
293 for vim_id, vim_info in vim_dict.items():
294
295 # look for task
296 for task in vm_to_refresh_dict[vim_id]:
297 task_need_update = False
298 task_id = task["instance_action_id"] + "." + str(task["task_index"])
299 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
300
301 # check and update interfaces
302 task_warning_msg = ""
303 for interface in vim_info.get("interfaces", ()):
304 vim_interface_id = interface["vim_interface_id"]
305 if vim_interface_id not in task["extra"]["interfaces"]:
306 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
307 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
308 continue
309 task_interface = task["extra"]["interfaces"][vim_interface_id]
310 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
311 if task_vim_interface != interface:
312 # delete old port
313 if task_interface.get("sdn_port_id"):
314 try:
315 with self.db_lock:
316 self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
317 task_interface["sdn_port_id"] = None
318 task_need_update = True
319 except ovimException as e:
320 error_text = "ovimException deleting external_port={}: {}".format(
321 task_interface["sdn_port_id"], e)
322 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
323 task_warning_msg += error_text
324 # TODO Set error_msg at instance_nets instead of instance VMs
325
326 # Create SDN port
327 sdn_net_id = task_interface.get("sdn_net_id")
328 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
329 sdn_port_name = sdn_net_id + "." + task["vim_id"]
330 sdn_port_name = sdn_port_name[:63]
331 try:
332 with self.db_lock:
333 sdn_port_id = self.ovim.new_external_port(
334 {"compute_node": interface["compute_node"],
335 "pci": interface["pci"],
336 "vlan": interface.get("vlan"),
337 "net_id": sdn_net_id,
338 "region": self.vim["config"]["datacenter_id"],
339 "name": sdn_port_name,
340 "mac": interface.get("mac_address")})
341 task_interface["sdn_port_id"] = sdn_port_id
342 task_need_update = True
343 except (ovimException, Exception) as e:
344 error_text = "ovimException creating new_external_port compute_node={}"\
345 " pci={} vlan={} {}".format(
346 interface["compute_node"],
347 interface["pci"],
348 interface.get("vlan"), e)
349 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
350 task_warning_msg += error_text
351 # TODO Set error_msg at instance_nets instead of instance VMs
352
353 with self.db_lock:
354 self.db.update_rows(
355 'instance_interfaces',
356 UPDATE={"mac_address": interface.get("mac_address"),
357 "ip_address": interface.get("ip_address"),
358 "vim_info": interface.get("vim_info"),
359 "sdn_port_id": task_interface.get("sdn_port_id"),
360 "compute_node": interface.get("compute_node"),
361 "pci": interface.get("pci"),
362 "vlan": interface.get("vlan")},
363 WHERE={'uuid': task_interface["iface_id"]})
364 task["vim_interfaces"][vim_interface_id] = interface
365
366 # check and update task and instance_vms database
367 vim_info_error_msg = None
368 if vim_info.get("error_msg"):
369 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
370 elif task_warning_msg:
371 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
372 task_vim_info = task.get("vim_info")
373 task_error_msg = task.get("error_msg")
374 task_vim_status = task["extra"].get("vim_status")
375 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
376 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
377 temp_dict = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
378 if vim_info.get("vim_info"):
379 temp_dict["vim_info"] = vim_info["vim_info"]
380 with self.db_lock:
381 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
382 task["extra"]["vim_status"] = vim_info["status"]
383 task["error_msg"] = vim_info_error_msg
384 if vim_info.get("vim_info"):
385 task["vim_info"] = vim_info["vim_info"]
386 task_need_update = True
387
388 if task_need_update:
389 with self.db_lock:
390 self.db.update_rows(
391 'vim_actions',
392 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
393 "error_msg": task.get("error_msg"), "modified_at": now},
394 WHERE={'instance_action_id': task['instance_action_id'],
395 'task_index': task['task_index']})
396 if task["extra"].get("vim_status") == "BUILD":
397 self._insert_refresh(task, now + self.REFRESH_BUILD)
398 else:
399 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
400
401 if net_to_refresh_list:
402 now = time.time()
403 try:
404 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
405 except vimconn.vimconnException as e:
406 # Mark all tasks at VIM_ERROR status
407 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
408 vim_dict = {}
409 for vim_id in net_to_refresh_list:
410 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
411
412 for vim_id, vim_info in vim_dict.items():
413 # look for task
414 for task in net_to_refresh_dict[vim_id]:
415 task_id = task["instance_action_id"] + "." + str(task["task_index"])
416 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
417
418 task_vim_info = task.get("vim_info")
419 task_vim_status = task["extra"].get("vim_status")
420 task_error_msg = task.get("error_msg")
421 task_sdn_net_id = task["extra"].get("sdn_net_id")
422
423 vim_info_status = vim_info["status"]
424 vim_info_error_msg = vim_info.get("error_msg")
425 # get ovim status
426 if task_sdn_net_id:
427 try:
428 with self.db_lock:
429 sdn_net = self.ovim.show_network(task_sdn_net_id)
430 except (ovimException, Exception) as e:
431 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
432 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
433 sdn_net = {"status": "ERROR", "last_error": text_error}
434 if sdn_net["status"] == "ERROR":
435 if not vim_info_error_msg:
436 vim_info_error_msg = str(sdn_net.get("last_error"))
437 else:
438 vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
439 self._format_vim_error_msg(vim_info_error_msg, 1024//2-14),
440 self._format_vim_error_msg(sdn_net["last_error"], 1024//2-14))
441 vim_info_status = "ERROR"
442 elif sdn_net["status"] == "BUILD":
443 if vim_info_status == "ACTIVE":
444 vim_info_status = "BUILD"
445
446 # update database
447 if vim_info_error_msg:
448 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
449 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
450 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
451 task["extra"]["vim_status"] = vim_info_status
452 task["error_msg"] = vim_info_error_msg
453 if vim_info.get("vim_info"):
454 task["vim_info"] = vim_info["vim_info"]
455 temp_dict = {"status": vim_info_status, "error_msg": vim_info_error_msg}
456 if vim_info.get("vim_info"):
457 temp_dict["vim_info"] = vim_info["vim_info"]
458 with self.db_lock:
459 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
460 self.db.update_rows(
461 'vim_actions',
462 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
463 "error_msg": task.get("error_msg"), "modified_at": now},
464 WHERE={'instance_action_id': task['instance_action_id'],
465 'task_index': task['task_index']})
466 if task["extra"].get("vim_status") == "BUILD":
467 self._insert_refresh(task, now + self.REFRESH_BUILD)
468 else:
469 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
470
471 return nb_processed
472
473 def _insert_refresh(self, task, threshold_time=None):
474 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
475 It is assumed that this is called inside this thread
476 """
477 if not self.vim:
478 return
479 if not threshold_time:
480 threshold_time = time.time()
481 task["modified_at"] = threshold_time
482 task_name = task["item"][9:] + "-" + task["action"]
483 task_id = task["instance_action_id"] + "." + str(task["task_index"])
484 for index in range(0, len(self.refresh_tasks)):
485 if self.refresh_tasks[index]["modified_at"] > threshold_time:
486 self.refresh_tasks.insert(index, task)
487 break
488 else:
489 index = len(self.refresh_tasks)
490 self.refresh_tasks.append(task)
491 self.logger.debug("task={} new refresh name={}, modified_at={} index={}".format(
492 task_id, task_name, task["modified_at"], index))
493
494 def _remove_refresh(self, task_name, vim_id):
495 """Remove a task with this name and vim_id from the list of refreshing elements.
496 It is assumed that this is called inside this thread outside _refres_elements method
497 Return True if self.refresh_list is modified, task is found
498 Return False if not found
499 """
500 index_to_delete = None
501 for index in range(0, len(self.refresh_tasks)):
502 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
503 index_to_delete = index
504 break
505 else:
506 return False
507 if not index_to_delete:
508 del self.refresh_tasks[index_to_delete]
509 return True
510
511 def _proccess_pending_tasks(self):
512 nb_created = 0
513 nb_processed = 0
514 while self.pending_tasks:
515 task = self.pending_tasks.pop(0)
516 nb_processed += 1
517 try:
518 # check if tasks that this depends on have been completed
519 dependency_not_completed = False
520 for task_index in task["extra"].get("depends_on", ()):
521 task_dependency = task["depends"].get("TASK-" + str(task_index))
522 if not task_dependency:
523 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
524 if not task_dependency:
525 raise VimThreadException(
526 "Cannot get depending net task trying to get depending task {}.{}".format(
527 task["instance_action_id"], task_index))
528 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
529 if task_dependency["status"] == "SCHEDULED":
530 dependency_not_completed = True
531 break
532 elif task_dependency["status"] == "FAILED":
533 raise VimThreadException(
534 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
535 task["action"], task["item"],
536 task["instance_action_id"], task["task_index"],
537 task_dependency["instance_action_id"], task_dependency["task_index"],
538 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
539 if dependency_not_completed:
540 # Move this task to the end.
541 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
542 if task["extra"]["tries"] <= 3:
543 self.pending_tasks.append(task)
544 continue
545 else:
546 raise VimThreadException(
547 "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
548 "(task {}.{})".format(task["action"], task["item"],
549 task["instance_action_id"], task["task_index"],
550 task_dependency["instance_action_id"], task_dependency["task_index"],
551 task_dependency["action"], task_dependency["item"]))
552
553 if task["status"] == "SUPERSEDED":
554 # not needed to do anything but update database with the new status
555 result = True
556 database_update = None
557 elif not self.vim:
558 task["status"] = "ERROR"
559 task["error_msg"] = self.error_status
560 result = False
561 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
562 elif task["item"] == 'instance_vms':
563 if task["action"] == "CREATE":
564 result, database_update = self.new_vm(task)
565 nb_created += 1
566 elif task["action"] == "DELETE":
567 result, database_update = self.del_vm(task)
568 else:
569 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
570 elif task["item"] == 'instance_nets':
571 if task["action"] == "CREATE":
572 result, database_update = self.new_net(task)
573 nb_created += 1
574 elif task["action"] == "DELETE":
575 result, database_update = self.del_net(task)
576 elif task["action"] == "FIND":
577 result, database_update = self.get_net(task)
578 else:
579 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
580 elif task["item"] == 'instance_sfis':
581 if task["action"] == "CREATE":
582 result, database_update = self.new_sfi(task)
583 nb_created += 1
584 elif task["action"] == "DELETE":
585 result, database_update = self.del_sfi(task)
586 else:
587 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
588 elif task["item"] == 'instance_sfs':
589 if task["action"] == "CREATE":
590 result, database_update = self.new_sf(task)
591 nb_created += 1
592 elif task["action"] == "DELETE":
593 result, database_update = self.del_sf(task)
594 else:
595 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
596 elif task["item"] == 'instance_classifications':
597 if task["action"] == "CREATE":
598 result, database_update = self.new_classification(task)
599 nb_created += 1
600 elif task["action"] == "DELETE":
601 result, database_update = self.del_classification(task)
602 else:
603 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
604 elif task["item"] == 'instance_sfps':
605 if task["action"] == "CREATE":
606 result, database_update = self.new_sfp(task)
607 nb_created += 1
608 elif task["action"] == "DELETE":
609 result, database_update = self.del_sfp(task)
610 else:
611 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
612 else:
613 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
614 # TODO
615 except VimThreadException as e:
616 result = False
617 task["error_msg"] = str(e)
618 task["status"] = "FAILED"
619 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
620 if task["item"] == 'instance_vms':
621 database_update["vim_vm_id"] = None
622 elif task["item"] == 'instance_nets':
623 database_update["vim_net_id"] = None
624
625 no_refresh_tasks = ['instance_sfis', 'instance_sfs',
626 'instance_classifications', 'instance_sfps']
627 if task["action"] == "DELETE":
628 action_key = task["item"] + task["item_id"]
629 del self.grouped_tasks[action_key]
630 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
631 if task["item"] not in no_refresh_tasks:
632 self._insert_refresh(task)
633
634 task_id = task["instance_action_id"] + "." + str(task["task_index"])
635 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
636 task_id, task["item"], task["action"], task["status"],
637 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
638 try:
639 now = time.time()
640 with self.db_lock:
641 self.db.update_rows(
642 table="vim_actions",
643 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
644 "error_msg": task["error_msg"],
645 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
646 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
647 if result is not None:
648 self.db.update_rows(
649 table="instance_actions",
650 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
651 "modified_at": now},
652 WHERE={"uuid": task["instance_action_id"]})
653 if database_update:
654 self.db.update_rows(table=task["item"],
655 UPDATE=database_update,
656 WHERE={"uuid": task["item_id"]})
657 except db_base_Exception as e:
658 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
659
660 if nb_created == 10:
661 break
662 return nb_processed
663
664 def _insert_pending_tasks(self, vim_actions_list):
665 for task in vim_actions_list:
666 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
667 continue
668 item = task["item"]
669 item_id = task["item_id"]
670 action_key = item + item_id
671 if action_key not in self.grouped_tasks:
672 self.grouped_tasks[action_key] = []
673 task["params"] = None
674 task["depends"] = {}
675 if task["extra"]:
676 extra = yaml.load(task["extra"])
677 task["extra"] = extra
678 task["params"] = extra.get("params")
679 depends_on_list = extra.get("depends_on")
680 if depends_on_list:
681 for dependency_task in depends_on_list:
682 if isinstance(dependency_task, int):
683 index = dependency_task
684 else:
685 instance_action_id, _, task_id = dependency_task.rpartition(".")
686 if instance_action_id != task["instance_action_id"]:
687 continue
688 index = int(task_id)
689
690 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\
691 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
692 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
693 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], index)] = vim_actions_list[index]
694 if extra.get("interfaces"):
695 task["vim_interfaces"] = {}
696 else:
697 task["extra"] = {}
698 if "error_msg" not in task:
699 task["error_msg"] = None
700 if "vim_id" not in task:
701 task["vim_id"] = None
702
703 if task["action"] == "DELETE":
704 need_delete_action = False
705 for to_supersede in self.grouped_tasks.get(action_key, ()):
706 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
707 task["vim_id"] = to_supersede["vim_id"]
708 if to_supersede["action"] == "CREATE" and to_supersede["extra"].get("created", True) and \
709 (to_supersede.get("vim_id") or to_supersede["extra"].get("sdn_net_id")):
710 need_delete_action = True
711 task["vim_id"] = to_supersede["vim_id"]
712 if to_supersede["extra"].get("sdn_net_id"):
713 task["extra"]["sdn_net_id"] = to_supersede["extra"]["sdn_net_id"]
714 if to_supersede["extra"].get("interfaces"):
715 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
716 if to_supersede["extra"].get("created_items"):
717 if not task["extra"].get("created_items"):
718 task["extra"]["created_items"] = {}
719 task["extra"]["created_items"].update(to_supersede["extra"]["created_items"])
720 # Mark task as SUPERSEDED.
721 # If task is in self.pending_tasks, it will be removed and database will be update
722 # If task is in self.refresh_tasks, it will be removed
723 to_supersede["status"] = "SUPERSEDED"
724 if not need_delete_action:
725 task["status"] = "SUPERSEDED"
726
727 self.grouped_tasks[action_key].append(task)
728 self.pending_tasks.append(task)
729 elif task["status"] == "SCHEDULED":
730 self.grouped_tasks[action_key].append(task)
731 self.pending_tasks.append(task)
732 elif task["action"] in ("CREATE", "FIND"):
733 self.grouped_tasks[action_key].append(task)
734 if task["status"] in ("DONE", "BUILD"):
735 self._insert_refresh(task)
736 # TODO add VM reset, get console, etc...
737 else:
738 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
739
740 def insert_task(self, task):
741 try:
742 self.task_queue.put(task, False)
743 return None
744 except Queue.Full:
745 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
746
747 def del_task(self, task):
748 with self.task_lock:
749 if task["status"] == "SCHEDULED":
750 task["status"] = "SUPERSEDED"
751 return True
752 else: # task["status"] == "processing"
753 self.task_lock.release()
754 return False
755
756 def run(self):
757 self.logger.debug("Starting")
758 while True:
759 self.get_vimconnector()
760 self.logger.debug("Vimconnector loaded")
761 self._reload_vim_actions()
762 reload_thread = False
763
764 while True:
765 try:
766 while not self.task_queue.empty():
767 task = self.task_queue.get()
768 if isinstance(task, list):
769 self._insert_pending_tasks(task)
770 elif isinstance(task, str):
771 if task == 'exit':
772 return 0
773 elif task == 'reload':
774 reload_thread = True
775 break
776 self.task_queue.task_done()
777 if reload_thread:
778 break
779 nb_processed = self._proccess_pending_tasks()
780 nb_processed += self._refres_elements()
781 if not nb_processed:
782 time.sleep(1)
783
784 except Exception as e:
785 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
786
787 self.logger.debug("Finishing")
788
789 def _look_for_task(self, instance_action_id, task_id):
790 """
791 Look for a concrete task at vim_actions database table
792 :param instance_action_id: The instance_action_id
793 :param task_id: Can have several formats:
794 <task index>: integer
795 TASK-<task index> :backward compatibility,
796 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
797 :return: Task dictionary or None if not found
798 """
799 if isinstance(task_id, int):
800 task_index = task_id
801 else:
802 if task_id.startswith("TASK-"):
803 task_id = task_id[5:]
804 ins_action_id, _, task_index = task_id.rpartition(".")
805 if ins_action_id:
806 instance_action_id = ins_action_id
807
808 with self.db_lock:
809 tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id,
810 "task_index": task_index})
811 if not tasks:
812 return None
813 task = tasks[0]
814 task["params"] = None
815 task["depends"] = {}
816 if task["extra"]:
817 extra = yaml.load(task["extra"])
818 task["extra"] = extra
819 task["params"] = extra.get("params")
820 if extra.get("interfaces"):
821 task["vim_interfaces"] = {}
822 else:
823 task["extra"] = {}
824 return task
825
826 @staticmethod
827 def _format_vim_error_msg(error_text, max_length=1024):
828 if error_text and len(error_text) >= max_length:
829 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
830 return error_text
831
832 def new_vm(self, task):
833 task_id = task["instance_action_id"] + "." + str(task["task_index"])
834 try:
835 params = task["params"]
836 depends = task.get("depends")
837 net_list = params[5]
838 for net in net_list:
839 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
840 task_dependency = task["depends"].get(net["net_id"])
841 if not task_dependency:
842 task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
843 if not task_dependency:
844 raise VimThreadException(
845 "Cannot get depending net task trying to get depending task {}.{}".format(
846 task["instance_action_id"], net["net_id"]))
847 network_id = task_dependency.get("vim_id")
848 if not network_id:
849 raise VimThreadException(
850 "Cannot create VM because depends on a network not created or found: " +
851 str(depends[net["net_id"]]["error_msg"]))
852 net["net_id"] = network_id
853 params_copy = deepcopy(params)
854 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
855
856 # fill task_interfaces. Look for snd_net_id at database for each interface
857 task_interfaces = {}
858 for iface in params_copy[5]:
859 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
860 with self.db_lock:
861 result = self.db.get_rows(
862 SELECT=('sdn_net_id',),
863 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
864 WHERE={'ii.uuid': iface["uuid"]})
865 if result:
866 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
867 else:
868 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
869 iface["uuid"]), exc_info=True)
870
871 task["vim_info"] = {}
872 task["vim_interfaces"] = {}
873 task["extra"]["interfaces"] = task_interfaces
874 task["extra"]["created"] = True
875 task["extra"]["created_items"] = created_items
876 task["error_msg"] = None
877 task["status"] = "DONE"
878 task["vim_id"] = vim_vm_id
879 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
880 return True, instance_element_update
881
882 except (vimconn.vimconnException, VimThreadException) as e:
883 self.logger.error("task={} new-VM: {}".format(task_id, e))
884 error_text = self._format_vim_error_msg(str(e))
885 task["error_msg"] = error_text
886 task["status"] = "FAILED"
887 task["vim_id"] = None
888 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
889 return False, instance_element_update
890
891 def del_vm(self, task):
892 task_id = task["instance_action_id"] + "." + str(task["task_index"])
893 vm_vim_id = task["vim_id"]
894 interfaces = task["extra"].get("interfaces", ())
895 try:
896 for iface in interfaces.values():
897 if iface.get("sdn_port_id"):
898 try:
899 with self.db_lock:
900 self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
901 except ovimException as e:
902 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
903 task_id, iface["sdn_port_id"], e), exc_info=True)
904 # TODO Set error_msg at instance_nets
905
906 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
907 task["status"] = "DONE"
908 task["error_msg"] = None
909 return True, None
910
911 except vimconn.vimconnException as e:
912 task["error_msg"] = self._format_vim_error_msg(str(e))
913 if isinstance(e, vimconn.vimconnNotFoundException):
914 # If not found mark as Done and fill error_msg
915 task["status"] = "DONE"
916 return True, None
917 task["status"] = "FAILED"
918 return False, None
919
920 def _get_net_internal(self, task, filter_param):
921 """
922 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
923 :param task: task for this find or find-or-create action
924 :param filter_param: parameters to send to the vimconnector
925 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
926 when network is not found or found more than one
927 """
928 vim_nets = self.vim.get_network_list(filter_param)
929 if not vim_nets:
930 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
931 elif len(vim_nets) > 1:
932 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
933 vim_net_id = vim_nets[0]["id"]
934
935 # Discover if this network is managed by a sdn controller
936 sdn_net_id = None
937 with self.db_lock:
938 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
939 WHERE={'vim_net_id': vim_net_id, 'instance_scenario_id': None,
940 'datacenter_tenant_id': self.datacenter_tenant_id})
941 if result:
942 sdn_net_id = result[0]['sdn_net_id']
943
944 task["status"] = "DONE"
945 task["extra"]["vim_info"] = {}
946 task["extra"]["created"] = False
947 task["extra"]["sdn_net_id"] = sdn_net_id
948 task["error_msg"] = None
949 task["vim_id"] = vim_net_id
950 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
951 "error_msg": None, "sdn_net_id": sdn_net_id}
952 return instance_element_update
953
954 def get_net(self, task):
955 task_id = task["instance_action_id"] + "." + str(task["task_index"])
956 try:
957 params = task["params"]
958 filter_param = params[0]
959 instance_element_update = self._get_net_internal(task, filter_param)
960 return True, instance_element_update
961
962 except (vimconn.vimconnException, VimThreadException) as e:
963 self.logger.error("task={} get-net: {}".format(task_id, e))
964 task["status"] = "FAILED"
965 task["vim_id"] = None
966 task["error_msg"] = self._format_vim_error_msg(str(e))
967 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
968 "error_msg": task["error_msg"]}
969 return False, instance_element_update
970
971 def new_net(self, task):
972 vim_net_id = None
973 sdn_net_id = None
974 task_id = task["instance_action_id"] + "." + str(task["task_index"])
975 action_text = ""
976 try:
977 # FIND
978 if task["extra"].get("find"):
979 action_text = "finding"
980 filter_param = task["extra"]["find"][0]
981 try:
982 instance_element_update = self._get_net_internal(task, filter_param)
983 return True, instance_element_update
984 except VimThreadExceptionNotFound:
985 pass
986 # CREATE
987 params = task["params"]
988 action_text = "creating VIM"
989 vim_net_id = self.vim.new_network(*params)
990
991 net_name = params[0]
992 net_type = params[1]
993
994 sdn_controller = self.vim.config.get('sdn-controller')
995 if sdn_controller and (net_type == "data" or net_type == "ptp"):
996 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
997
998 vim_net = self.vim.get_network(vim_net_id)
999 if vim_net.get('encapsulation') != 'vlan':
1000 raise vimconn.vimconnException(
1001 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
1002 net_name, net_type, vim_net['encapsulation']))
1003 network["vlan"] = vim_net.get('segmentation_id')
1004 action_text = "creating SDN"
1005 with self.db_lock:
1006 sdn_net_id = self.ovim.new_network(network)
1007 task["status"] = "DONE"
1008 task["extra"]["vim_info"] = {}
1009 task["extra"]["sdn_net_id"] = sdn_net_id
1010 task["extra"]["created"] = True
1011 task["error_msg"] = None
1012 task["vim_id"] = vim_net_id
1013 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
1014 "created": True, "error_msg": None}
1015 return True, instance_element_update
1016 except (vimconn.vimconnException, ovimException) as e:
1017 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1018 task["status"] = "FAILED"
1019 task["vim_id"] = vim_net_id
1020 task["error_msg"] = self._format_vim_error_msg(str(e))
1021 task["extra"]["sdn_net_id"] = sdn_net_id
1022 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
1023 "error_msg": task["error_msg"]}
1024 return False, instance_element_update
1025
1026 def del_net(self, task):
1027 net_vim_id = task["vim_id"]
1028 sdn_net_id = task["extra"].get("sdn_net_id")
1029 try:
1030 if sdn_net_id:
1031 # Delete any attached port to this sdn network. There can be ports associated to this network in case
1032 # it was manually done using 'openmano vim-net-sdn-attach'
1033 with self.db_lock:
1034 port_list = self.ovim.get_ports(columns={'uuid'},
1035 filter={'name': 'external_port', 'net_id': sdn_net_id})
1036 for port in port_list:
1037 self.ovim.delete_port(port['uuid'], idempotent=True)
1038 self.ovim.delete_network(sdn_net_id, idempotent=True)
1039 if net_vim_id:
1040 self.vim.delete_network(net_vim_id)
1041 task["status"] = "DONE"
1042 task["error_msg"] = None
1043 return True, None
1044 except ovimException as e:
1045 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
1046 "ports for net {}: {}".format(sdn_net_id, str(e)))
1047 except vimconn.vimconnException as e:
1048 task["error_msg"] = self._format_vim_error_msg(str(e))
1049 if isinstance(e, vimconn.vimconnNotFoundException):
1050 # If not found mark as Done and fill error_msg
1051 task["status"] = "DONE"
1052 return True, None
1053 task["status"] = "FAILED"
1054 return False, None
1055
1056 ## Service Function Instances
1057
1058 def new_sfi(self, task):
1059 vim_sfi_id = None
1060 try:
1061 params = task["params"]
1062 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1063 depends = task.get("depends")
1064 error_text = ""
1065 interfaces = task.get("depends").values()[0].get("extra").get("params")[5]
1066 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1067 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack, only the
1068 # first ingress and first egress ports will be used to create the SFI (Port Pair).
1069 port_id_list = [interfaces[0].get("vim_id")]
1070 name = "sfi-%s" % task["item_id"][:8]
1071 # By default no form of IETF SFC Encapsulation will be used
1072 vim_sfi_id = self.vim.new_sfi(name, port_id_list, port_id_list, sfc_encap=False)
1073
1074 task["extra"]["created"] = True
1075 task["error_msg"] = None
1076 task["status"] = "DONE"
1077 task["vim_id"] = vim_sfi_id
1078 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1079 return True, instance_element_update
1080
1081 except (vimconn.vimconnException, VimThreadException) as e:
1082 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1083 error_text = self._format_vim_error_msg(str(e))
1084 task["error_msg"] = error_text
1085 task["status"] = "FAILED"
1086 task["vim_id"] = None
1087 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1088 return False, instance_element_update
1089
1090 def del_sfi(self, task):
1091 sfi_vim_id = task["vim_id"]
1092 try:
1093 self.vim.delete_sfi(sfi_vim_id)
1094 task["status"] = "DONE"
1095 task["error_msg"] = None
1096 return True, None
1097
1098 except vimconn.vimconnException as e:
1099 task["error_msg"] = self._format_vim_error_msg(str(e))
1100 if isinstance(e, vimconn.vimconnNotFoundException):
1101 # If not found mark as Done and fill error_msg
1102 task["status"] = "DONE"
1103 return True, None
1104 task["status"] = "FAILED"
1105 return False, None
1106
1107 def new_sf(self, task):
1108 vim_sf_id = None
1109 try:
1110 params = task["params"]
1111 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1112 depends = task.get("depends")
1113 error_text = ""
1114 #sfis = task.get("depends").values()[0].get("extra").get("params")[5]
1115 sfis = task.get("depends").values()
1116 sfi_id_list = []
1117 for sfi in sfis:
1118 sfi_id_list.append(sfi.get("vim_id"))
1119 name = "sf-%s" % task["item_id"][:8]
1120 # By default no form of IETF SFC Encapsulation will be used
1121 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1122
1123 task["extra"]["created"] = True
1124 task["error_msg"] = None
1125 task["status"] = "DONE"
1126 task["vim_id"] = vim_sf_id
1127 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1128 return True, instance_element_update
1129
1130 except (vimconn.vimconnException, VimThreadException) as e:
1131 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1132 error_text = self._format_vim_error_msg(str(e))
1133 task["error_msg"] = error_text
1134 task["status"] = "FAILED"
1135 task["vim_id"] = None
1136 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1137 return False, instance_element_update
1138
1139 def del_sf(self, task):
1140 sf_vim_id = task["vim_id"]
1141 try:
1142 self.vim.delete_sf(sf_vim_id)
1143 task["status"] = "DONE"
1144 task["error_msg"] = None
1145 return True, None
1146
1147 except vimconn.vimconnException as e:
1148 task["error_msg"] = self._format_vim_error_msg(str(e))
1149 if isinstance(e, vimconn.vimconnNotFoundException):
1150 # If not found mark as Done and fill error_msg
1151 task["status"] = "DONE"
1152 return True, None
1153 task["status"] = "FAILED"
1154 return False, None
1155
1156 def new_classification(self, task):
1157 vim_classification_id = None
1158 try:
1159 params = task["params"]
1160 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1161 depends = task.get("depends")
1162 error_text = ""
1163 interfaces = task.get("depends").values()[0].get("extra").get("params")[5]
1164 # Bear in mind that different VIM connectors might support Classifications differently.
1165 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1166 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1167 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1168 # using the IPv4 flow classifier.
1169 name = "c-%s" % task["item_id"][:8]
1170 # if not CIDR is given for the IP addresses, add /32:
1171 ip_proto = int(params.get("ip_proto"))
1172 source_ip = params.get("source_ip")
1173 destination_ip = params.get("destination_ip")
1174 if ip_proto == 1:
1175 ip_proto = 'icmp'
1176 elif ip_proto == 6:
1177 ip_proto = 'tcp'
1178 elif ip_proto == 17:
1179 ip_proto = 'udp'
1180 if '/' not in source_ip:
1181 source_ip += '/32'
1182 if '/' not in destination_ip:
1183 destination_ip += '/32'
1184 definition = {
1185 "logical_source_port": interfaces[0].get("vim_id"),
1186 "protocol": ip_proto,
1187 "source_ip_prefix": source_ip,
1188 "destination_ip_prefix": destination_ip,
1189 "source_port_range_min": params.get("source_port"),
1190 "source_port_range_max": params.get("source_port"),
1191 "destination_port_range_min": params.get("destination_port"),
1192 "destination_port_range_max": params.get("destination_port"),
1193 }
1194
1195 vim_classification_id = self.vim.new_classification(
1196 name, 'legacy_flow_classifier', definition)
1197
1198 task["extra"]["created"] = True
1199 task["error_msg"] = None
1200 task["status"] = "DONE"
1201 task["vim_id"] = vim_classification_id
1202 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id, "error_msg": None}
1203 return True, instance_element_update
1204
1205 except (vimconn.vimconnException, VimThreadException) as e:
1206 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1207 error_text = self._format_vim_error_msg(str(e))
1208 task["error_msg"] = error_text
1209 task["status"] = "FAILED"
1210 task["vim_id"] = None
1211 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1212 return False, instance_element_update
1213
1214 def del_classification(self, task):
1215 classification_vim_id = task["vim_id"]
1216 try:
1217 self.vim.delete_classification(classification_vim_id)
1218 task["status"] = "DONE"
1219 task["error_msg"] = None
1220 return True, None
1221
1222 except vimconn.vimconnException as e:
1223 task["error_msg"] = self._format_vim_error_msg(str(e))
1224 if isinstance(e, vimconn.vimconnNotFoundException):
1225 # If not found mark as Done and fill error_msg
1226 task["status"] = "DONE"
1227 return True, None
1228 task["status"] = "FAILED"
1229 return False, None
1230
1231 def new_sfp(self, task):
1232 vim_sfp_id = None
1233 try:
1234 params = task["params"]
1235 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1236 depends = task.get("depends")
1237 error_text = ""
1238 deps = task.get("depends").values()
1239 sf_id_list = []
1240 classification_id_list = []
1241 for dep in deps:
1242 vim_id = dep.get("vim_id")
1243 resource = dep.get("item")
1244 if resource == "instance_sfs":
1245 sf_id_list.append(vim_id)
1246 elif resource == "instance_classifications":
1247 classification_id_list.append(vim_id)
1248
1249 name = "sfp-%s" % task["item_id"][:8]
1250 # By default no form of IETF SFC Encapsulation will be used
1251 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1252
1253 task["extra"]["created"] = True
1254 task["error_msg"] = None
1255 task["status"] = "DONE"
1256 task["vim_id"] = vim_sfp_id
1257 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1258 return True, instance_element_update
1259
1260 except (vimconn.vimconnException, VimThreadException) as e:
1261 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1262 error_text = self._format_vim_error_msg(str(e))
1263 task["error_msg"] = error_text
1264 task["status"] = "FAILED"
1265 task["vim_id"] = None
1266 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1267 return False, instance_element_update
1268 return
1269
1270 def del_sfp(self, task):
1271 sfp_vim_id = task["vim_id"]
1272 try:
1273 self.vim.delete_sfp(sfp_vim_id)
1274 task["status"] = "DONE"
1275 task["error_msg"] = None
1276 return True, None
1277
1278 except vimconn.vimconnException as e:
1279 task["error_msg"] = self._format_vim_error_msg(str(e))
1280 if isinstance(e, vimconn.vimconnNotFoundException):
1281 # If not found mark as Done and fill error_msg
1282 task["status"] = "DONE"
1283 return True, None
1284 task["status"] = "FAILED"
1285 return False, None