Merge "Refactor to support OVS insted of prepopulate tagged interfaces and linux...
[osm/openvim.git] / host_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 '''
28 __author__ = "Pablo Montes, Alfonso Tierno, Leonardo Mirabal"
29 __date__ = "$10-jul-2014 12:07:15$"
30
31 import json
32 import yaml
33 import threading
34 import time
35 import Queue
36 import paramiko
37 from jsonschema import validate as js_v, exceptions as js_e
38 #import libvirt
39 import imp
40 from vim_schema import localinfo_schema, hostinfo_schema
41 import random
42 #from logging import Logger
43 #import auxiliary_functions as af
44
45 #TODO: insert a logging system
46
47
48 #global lvirt_module
49 #lvirt_module=None #libvirt module is charged only if not in test mode
50
51 class host_thread(threading.Thread):
52 lvirt_module = None
53 def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode, develop_bridge_iface):
54 '''Init a thread.
55 Arguments:
56 'id' number of thead
57 'name' name of thread
58 'host','user': host ip or name to manage and user
59 'db', 'db_lock': database class and lock to use it in exclusion
60 '''
61 threading.Thread.__init__(self)
62 self.name = name
63 self.host = host
64 self.user = user
65 self.db = db
66 self.db_lock = db_lock
67 self.test = test
68
69 if not test and not host_thread.lvirt_module:
70 try:
71 module_info = imp.find_module("libvirt")
72 host_thread.lvirt_module = imp.load_module("libvirt", *module_info)
73 except (IOError, ImportError) as e:
74 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e))
75
76
77 self.develop_mode = develop_mode
78 self.develop_bridge_iface = develop_bridge_iface
79 self.image_path = image_path
80 self.host_id = host_id
81 self.version = version
82
83 self.xml_level = 0
84 #self.pending ={}
85
86 self.server_status = {} #dictionary with pairs server_uuid:server_status
87 self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
88 self.next_update_server_status = 0 #time when must be check servers status
89
90 self.hostinfo = None
91
92 self.queueLock = threading.Lock()
93 self.taskQueue = Queue.Queue(2000)
94 self.ssh_conn = None
95
96 def ssh_connect(self):
97 try:
98 #Connect SSH
99 self.ssh_conn = paramiko.SSHClient()
100 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
101 self.ssh_conn.load_system_host_keys()
102 self.ssh_conn.connect(self.host, username=self.user, timeout=10) #, None)
103 except paramiko.ssh_exception.SSHException as e:
104 text = e.args[0]
105 print self.name, ": ssh_connect ssh Exception:", text
106
107 def load_localinfo(self):
108 if not self.test:
109 try:
110 #Connect SSH
111 self.ssh_connect()
112
113 command = 'mkdir -p ' + self.image_path
114 #print self.name, ': command:', command
115 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
116 content = stderr.read()
117 if len(content) > 0:
118 print self.name, ': command:', command, "stderr:", content
119
120 command = 'cat ' + self.image_path + '/.openvim.yaml'
121 #print self.name, ': command:', command
122 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
123 content = stdout.read()
124 if len(content) == 0:
125 print self.name, ': command:', command, "stderr:", stderr.read()
126 raise paramiko.ssh_exception.SSHException("Error empty file ")
127 self.localinfo = yaml.load(content)
128 js_v(self.localinfo, localinfo_schema)
129 self.localinfo_dirty=False
130 if 'server_files' not in self.localinfo:
131 self.localinfo['server_files'] = {}
132 print self.name, ': localinfo load from host'
133 return
134
135 except paramiko.ssh_exception.SSHException as e:
136 text = e.args[0]
137 print self.name, ": load_localinfo ssh Exception:", text
138 except host_thread.lvirt_module.libvirtError as e:
139 text = e.get_error_message()
140 print self.name, ": load_localinfo libvirt Exception:", text
141 except yaml.YAMLError as exc:
142 text = ""
143 if hasattr(exc, 'problem_mark'):
144 mark = exc.problem_mark
145 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
146 print self.name, ": load_localinfo yaml format Exception", text
147 except js_e.ValidationError as e:
148 text = ""
149 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
150 print self.name, ": load_localinfo format Exception:", text, e.message
151 except Exception as e:
152 text = str(e)
153 print self.name, ": load_localinfo Exception:", text
154
155 #not loaded, insert a default data and force saving by activating dirty flag
156 self.localinfo = {'files':{}, 'server_files':{} }
157 #self.localinfo_dirty=True
158 self.localinfo_dirty=False
159
160 def load_hostinfo(self):
161 if self.test:
162 return;
163 try:
164 #Connect SSH
165 self.ssh_connect()
166
167
168 command = 'cat ' + self.image_path + '/hostinfo.yaml'
169 #print self.name, ': command:', command
170 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
171 content = stdout.read()
172 if len(content) == 0:
173 print self.name, ': command:', command, "stderr:", stderr.read()
174 raise paramiko.ssh_exception.SSHException("Error empty file ")
175 self.hostinfo = yaml.load(content)
176 js_v(self.hostinfo, hostinfo_schema)
177 print self.name, ': hostlinfo load from host', self.hostinfo
178 return
179
180 except paramiko.ssh_exception.SSHException as e:
181 text = e.args[0]
182 print self.name, ": load_hostinfo ssh Exception:", text
183 except host_thread.lvirt_module.libvirtError as e:
184 text = e.get_error_message()
185 print self.name, ": load_hostinfo libvirt Exception:", text
186 except yaml.YAMLError as exc:
187 text = ""
188 if hasattr(exc, 'problem_mark'):
189 mark = exc.problem_mark
190 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
191 print self.name, ": load_hostinfo yaml format Exception", text
192 except js_e.ValidationError as e:
193 text = ""
194 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
195 print self.name, ": load_hostinfo format Exception:", text, e.message
196 except Exception as e:
197 text = str(e)
198 print self.name, ": load_hostinfo Exception:", text
199
200 #not loaded, insert a default data
201 self.hostinfo = None
202
203 def save_localinfo(self, tries=3):
204 if self.test:
205 self.localinfo_dirty = False
206 return
207
208 while tries>=0:
209 tries-=1
210
211 try:
212 command = 'cat > ' + self.image_path + '/.openvim.yaml'
213 print self.name, ': command:', command
214 (stdin, _, _) = self.ssh_conn.exec_command(command)
215 yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True)
216 self.localinfo_dirty = False
217 break #while tries
218
219 except paramiko.ssh_exception.SSHException as e:
220 text = e.args[0]
221 print self.name, ": save_localinfo ssh Exception:", text
222 if "SSH session not active" in text:
223 self.ssh_connect()
224 except host_thread.lvirt_module.libvirtError as e:
225 text = e.get_error_message()
226 print self.name, ": save_localinfo libvirt Exception:", text
227 except yaml.YAMLError as exc:
228 text = ""
229 if hasattr(exc, 'problem_mark'):
230 mark = exc.problem_mark
231 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
232 print self.name, ": save_localinfo yaml format Exception", text
233 except Exception as e:
234 text = str(e)
235 print self.name, ": save_localinfo Exception:", text
236
237 def load_servers_from_db(self):
238 self.db_lock.acquire()
239 r,c = self.db.get_table(SELECT=('uuid','status', 'image_id'), FROM='instances', WHERE={'host_id': self.host_id})
240 self.db_lock.release()
241
242 self.server_status = {}
243 if r<0:
244 print self.name, ": Error getting data from database:", c
245 return
246 for server in c:
247 self.server_status[ server['uuid'] ] = server['status']
248
249 #convert from old version to new one
250 if 'inc_files' in self.localinfo and server['uuid'] in self.localinfo['inc_files']:
251 server_files_dict = {'source file': self.localinfo['inc_files'][ server['uuid'] ] [0], 'file format':'raw' }
252 if server_files_dict['source file'][-5:] == 'qcow2':
253 server_files_dict['file format'] = 'qcow2'
254
255 self.localinfo['server_files'][ server['uuid'] ] = { server['image_id'] : server_files_dict }
256 if 'inc_files' in self.localinfo:
257 del self.localinfo['inc_files']
258 self.localinfo_dirty = True
259
260 def delete_unused_files(self):
261 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
262 Deletes unused entries at self.loacalinfo and the corresponding local files.
263 The only reason for this mismatch is the manual deletion of instances (VM) at database
264 '''
265 if self.test:
266 return
267 for uuid,images in self.localinfo['server_files'].items():
268 if uuid not in self.server_status:
269 for localfile in images.values():
270 try:
271 print self.name, ": deleting file '%s' of unused server '%s'" %(localfile['source file'], uuid)
272 self.delete_file(localfile['source file'])
273 except paramiko.ssh_exception.SSHException as e:
274 print self.name, ": Exception deleting file '%s': %s" %(localfile['source file'], str(e))
275 del self.localinfo['server_files'][uuid]
276 self.localinfo_dirty = True
277
278 def insert_task(self, task, *aditional):
279 try:
280 self.queueLock.acquire()
281 task = self.taskQueue.put( (task,) + aditional, timeout=5)
282 self.queueLock.release()
283 return 1, None
284 except Queue.Full:
285 return -1, "timeout inserting a task over host " + self.name
286
287 def run(self):
288 while True:
289 self.load_localinfo()
290 self.load_hostinfo()
291 self.load_servers_from_db()
292 self.delete_unused_files()
293 while True:
294 self.queueLock.acquire()
295 if not self.taskQueue.empty():
296 task = self.taskQueue.get()
297 else:
298 task = None
299 self.queueLock.release()
300
301 if task is None:
302 now=time.time()
303 if self.localinfo_dirty:
304 self.save_localinfo()
305 elif self.next_update_server_status < now:
306 self.update_servers_status()
307 self.next_update_server_status = now + 5
308 elif len(self.pending_terminate_server)>0 and self.pending_terminate_server[0][0]<now:
309 self.server_forceoff()
310 else:
311 time.sleep(1)
312 continue
313
314 if task[0] == 'instance':
315 print self.name, ": processing task instance", task[1]['action']
316 retry=0
317 while retry <2:
318 retry += 1
319 r=self.action_on_server(task[1], retry==2)
320 if r>=0:
321 break
322 elif task[0] == 'image':
323 pass
324 elif task[0] == 'exit':
325 print self.name, ": processing task exit"
326 self.terminate()
327 return 0
328 elif task[0] == 'reload':
329 print self.name, ": processing task reload terminating and relaunching"
330 self.terminate()
331 break
332 elif task[0] == 'edit-iface':
333 print self.name, ": processing task edit-iface port=%s, old_net=%s, new_net=%s" % (task[1], task[2], task[3])
334 self.edit_iface(task[1], task[2], task[3])
335 elif task[0] == 'restore-iface':
336 print self.name, ": processing task restore-iface %s mac=%s" % (task[1], task[2])
337 self.restore_iface(task[1], task[2])
338 elif task[0] == 'new-ovsbridge':
339 print self.name, ": Creating compute OVS bridge"
340 self.create_ovs_bridge()
341 break
342 elif task[0] == 'new-vxlan':
343 print self.name, ": Creating vxlan tunnel=" + task[1] + ", remote ip=" + task[2]
344 self.create_ovs_vxlan_tunnel(task[1], task[2])
345 break
346 elif task[0] == 'del-ovsbridge':
347 print self.name, ": Deleting OVS bridge"
348 self.delete_ovs_bridge()
349 break
350 elif task[0] == 'del-vxlan':
351 print self.name, ": Deleting vxlan " + task[1] + " tunnel"
352 self.delete_ovs_vxlan_tunnel(task[1])
353 break
354 elif task[0] == 'create-ovs-bridge-port':
355 print self.name, ": Adding port ovim-" + task[1] + " to OVS bridge"
356 self.create_ovs_bridge_port(task[1])
357 elif task[0] == 'del-ovs-port':
358 self.delete_bridge_port_attached_to_ovs(task[1], task[2])
359 else:
360 print self.name, ": unknown task", task
361
362 def server_forceoff(self, wait_until_finished=False):
363 while len(self.pending_terminate_server)>0:
364 now = time.time()
365 if self.pending_terminate_server[0][0]>now:
366 if wait_until_finished:
367 time.sleep(1)
368 continue
369 else:
370 return
371 req={'uuid':self.pending_terminate_server[0][1],
372 'action':{'terminate':'force'},
373 'status': None
374 }
375 self.action_on_server(req)
376 self.pending_terminate_server.pop(0)
377
378 def terminate(self):
379 try:
380 self.server_forceoff(True)
381 if self.localinfo_dirty:
382 self.save_localinfo()
383 if not self.test:
384 self.ssh_conn.close()
385 except Exception as e:
386 text = str(e)
387 print self.name, ": terminate Exception:", text
388 print self.name, ": exit from host_thread"
389
390 def get_local_iface_name(self, generic_name):
391 if self.hostinfo != None and "iface_names" in self.hostinfo and generic_name in self.hostinfo["iface_names"]:
392 return self.hostinfo["iface_names"][generic_name]
393 return generic_name
394
395 def create_xml_server(self, server, dev_list, server_metadata={}):
396 """Function that implements the generation of the VM XML definition.
397 Additional devices are in dev_list list
398 The main disk is upon dev_list[0]"""
399
400 #get if operating system is Windows
401 windows_os = False
402 os_type = server_metadata.get('os_type', None)
403 if os_type == None and 'metadata' in dev_list[0]:
404 os_type = dev_list[0]['metadata'].get('os_type', None)
405 if os_type != None and os_type.lower() == "windows":
406 windows_os = True
407 #get type of hard disk bus
408 bus_ide = True if windows_os else False
409 bus = server_metadata.get('bus', None)
410 if bus == None and 'metadata' in dev_list[0]:
411 bus = dev_list[0]['metadata'].get('bus', None)
412 if bus != None:
413 bus_ide = True if bus=='ide' else False
414
415 self.xml_level = 0
416
417 text = "<domain type='kvm'>"
418 #get topology
419 topo = server_metadata.get('topology', None)
420 if topo == None and 'metadata' in dev_list[0]:
421 topo = dev_list[0]['metadata'].get('topology', None)
422 #name
423 name = server.get('name','') + "_" + server['uuid']
424 name = name[:58] #qemu impose a length limit of 59 chars or not start. Using 58
425 text += self.inc_tab() + "<name>" + name+ "</name>"
426 #uuid
427 text += self.tab() + "<uuid>" + server['uuid'] + "</uuid>"
428
429 numa={}
430 if 'extended' in server and server['extended']!=None and 'numas' in server['extended']:
431 numa = server['extended']['numas'][0]
432 #memory
433 use_huge = False
434 memory = int(numa.get('memory',0))*1024*1024 #in KiB
435 if memory==0:
436 memory = int(server['ram'])*1024;
437 else:
438 if not self.develop_mode:
439 use_huge = True
440 if memory==0:
441 return -1, 'No memory assigned to instance'
442 memory = str(memory)
443 text += self.tab() + "<memory unit='KiB'>" +memory+"</memory>"
444 text += self.tab() + "<currentMemory unit='KiB'>" +memory+ "</currentMemory>"
445 if use_huge:
446 text += self.tab()+'<memoryBacking>'+ \
447 self.inc_tab() + '<hugepages/>'+ \
448 self.dec_tab()+ '</memoryBacking>'
449
450 #cpu
451 use_cpu_pinning=False
452 vcpus = int(server.get("vcpus",0))
453 cpu_pinning = []
454 if 'cores-source' in numa:
455 use_cpu_pinning=True
456 for index in range(0, len(numa['cores-source'])):
457 cpu_pinning.append( [ numa['cores-id'][index], numa['cores-source'][index] ] )
458 vcpus += 1
459 if 'threads-source' in numa:
460 use_cpu_pinning=True
461 for index in range(0, len(numa['threads-source'])):
462 cpu_pinning.append( [ numa['threads-id'][index], numa['threads-source'][index] ] )
463 vcpus += 1
464 if 'paired-threads-source' in numa:
465 use_cpu_pinning=True
466 for index in range(0, len(numa['paired-threads-source'])):
467 cpu_pinning.append( [numa['paired-threads-id'][index][0], numa['paired-threads-source'][index][0] ] )
468 cpu_pinning.append( [numa['paired-threads-id'][index][1], numa['paired-threads-source'][index][1] ] )
469 vcpus += 2
470
471 if use_cpu_pinning and not self.develop_mode:
472 text += self.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning)) +"</vcpu>" + \
473 self.tab()+'<cputune>'
474 self.xml_level += 1
475 for i in range(0, len(cpu_pinning)):
476 text += self.tab() + "<vcpupin vcpu='" +str(cpu_pinning[i][0])+ "' cpuset='" +str(cpu_pinning[i][1]) +"'/>"
477 text += self.dec_tab()+'</cputune>'+ \
478 self.tab() + '<numatune>' +\
479 self.inc_tab() + "<memory mode='strict' nodeset='" +str(numa['source'])+ "'/>" +\
480 self.dec_tab() + '</numatune>'
481 else:
482 if vcpus==0:
483 return -1, "Instance without number of cpus"
484 text += self.tab()+"<vcpu>" + str(vcpus) + "</vcpu>"
485
486 #boot
487 boot_cdrom = False
488 for dev in dev_list:
489 if dev['type']=='cdrom' :
490 boot_cdrom = True
491 break
492 text += self.tab()+ '<os>' + \
493 self.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
494 if boot_cdrom:
495 text += self.tab() + "<boot dev='cdrom'/>"
496 text += self.tab() + "<boot dev='hd'/>" + \
497 self.dec_tab()+'</os>'
498 #features
499 text += self.tab()+'<features>'+\
500 self.inc_tab()+'<acpi/>' +\
501 self.tab()+'<apic/>' +\
502 self.tab()+'<pae/>'+ \
503 self.dec_tab() +'</features>'
504 if windows_os or topo=="oneSocket":
505 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>"% vcpus
506 else:
507 text += self.tab() + "<cpu mode='host-model'></cpu>"
508 text += self.tab() + "<clock offset='utc'/>" +\
509 self.tab() + "<on_poweroff>preserve</on_poweroff>" + \
510 self.tab() + "<on_reboot>restart</on_reboot>" + \
511 self.tab() + "<on_crash>restart</on_crash>"
512 text += self.tab() + "<devices>" + \
513 self.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
514 self.tab() + "<serial type='pty'>" +\
515 self.inc_tab() + "<target port='0'/>" + \
516 self.dec_tab() + "</serial>" +\
517 self.tab() + "<console type='pty'>" + \
518 self.inc_tab()+ "<target type='serial' port='0'/>" + \
519 self.dec_tab()+'</console>'
520 if windows_os:
521 text += self.tab() + "<controller type='usb' index='0'/>" + \
522 self.tab() + "<controller type='ide' index='0'/>" + \
523 self.tab() + "<input type='mouse' bus='ps2'/>" + \
524 self.tab() + "<sound model='ich6'/>" + \
525 self.tab() + "<video>" + \
526 self.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
527 self.dec_tab() + "</video>" + \
528 self.tab() + "<memballoon model='virtio'/>" + \
529 self.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
530
531 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
532 #> self.dec_tab()+'</hostdev>\n' +\
533 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
534 if windows_os:
535 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
536 else:
537 #If image contains 'GRAPH' include graphics
538 #if 'GRAPH' in image:
539 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
540 self.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
541 self.dec_tab() + "</graphics>"
542
543 vd_index = 'a'
544 for dev in dev_list:
545 bus_ide_dev = bus_ide
546 if dev['type']=='cdrom' or dev['type']=='disk':
547 if dev['type']=='cdrom':
548 bus_ide_dev = True
549 text += self.tab() + "<disk type='file' device='"+dev['type']+"'>"
550 if 'file format' in dev:
551 text += self.inc_tab() + "<driver name='qemu' type='" +dev['file format']+ "' cache='writethrough'/>"
552 if 'source file' in dev:
553 text += self.tab() + "<source file='" +dev['source file']+ "'/>"
554 #elif v['type'] == 'block':
555 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
556 #else:
557 # return -1, 'Unknown disk type ' + v['type']
558 vpci = dev.get('vpci',None)
559 if vpci == None:
560 vpci = dev['metadata'].get('vpci',None)
561 text += self.pci2xml(vpci)
562
563 if bus_ide_dev:
564 text += self.tab() + "<target dev='hd" +vd_index+ "' bus='ide'/>" #TODO allows several type of disks
565 else:
566 text += self.tab() + "<target dev='vd" +vd_index+ "' bus='virtio'/>"
567 text += self.dec_tab() + '</disk>'
568 vd_index = chr(ord(vd_index)+1)
569 elif dev['type']=='xml':
570 dev_text = dev['xml']
571 if 'vpci' in dev:
572 dev_text = dev_text.replace('__vpci__', dev['vpci'])
573 if 'source file' in dev:
574 dev_text = dev_text.replace('__file__', dev['source file'])
575 if 'file format' in dev:
576 dev_text = dev_text.replace('__format__', dev['source file'])
577 if '__dev__' in dev_text:
578 dev_text = dev_text.replace('__dev__', vd_index)
579 vd_index = chr(ord(vd_index)+1)
580 text += dev_text
581 else:
582 return -1, 'Unknown device type ' + dev['type']
583
584 net_nb=0
585 bridge_interfaces = server.get('networks', [])
586 for v in bridge_interfaces:
587 #Get the brifge name
588 self.db_lock.acquire()
589 result, content = self.db.get_table(FROM='nets', SELECT=('provider',),WHERE={'uuid':v['net_id']} )
590 self.db_lock.release()
591 if result <= 0:
592 print "create_xml_server ERROR getting nets",result, content
593 return -1, content
594 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
595 #I know it is not secure
596 #for v in sorted(desc['network interfaces'].itervalues()):
597 model = v.get("model", None)
598 if content[0]['provider']=='default':
599 text += self.tab() + "<interface type='network'>" + \
600 self.inc_tab() + "<source network='" +content[0]['provider']+ "'/>"
601 elif content[0]['provider'][0:7]=='macvtap':
602 text += self.tab()+"<interface type='direct'>" + \
603 self.inc_tab() + "<source dev='" + self.get_local_iface_name(content[0]['provider'][8:]) + "' mode='bridge'/>" + \
604 self.tab() + "<target dev='macvtap0'/>"
605 if windows_os:
606 text += self.tab() + "<alias name='net" + str(net_nb) + "'/>"
607 elif model==None:
608 model = "virtio"
609 elif content[0]['provider'][0:6]=='bridge':
610 text += self.tab() + "<interface type='bridge'>" + \
611 self.inc_tab()+"<source bridge='" +self.get_local_iface_name(content[0]['provider'][7:])+ "'/>"
612 if windows_os:
613 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
614 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
615 elif model==None:
616 model = "virtio"
617 elif content[0]['provider'][0:3] == "OVS":
618 vlan = content[0]['provider'].replace('OVS:', '')
619 text += self.tab() + "<interface type='bridge'>" + \
620 self.inc_tab() + "<source bridge='ovim-" + vlan + "'/>"
621 else:
622 return -1, 'Unknown Bridge net provider ' + content[0]['provider']
623 if model!=None:
624 text += self.tab() + "<model type='" +model+ "'/>"
625 if v.get('mac_address', None) != None:
626 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
627 text += self.pci2xml(v.get('vpci',None))
628 text += self.dec_tab()+'</interface>'
629
630 net_nb += 1
631
632 interfaces = numa.get('interfaces', [])
633
634 net_nb=0
635 for v in interfaces:
636 if self.develop_mode: #map these interfaces to bridges
637 text += self.tab() + "<interface type='bridge'>" + \
638 self.inc_tab()+"<source bridge='" +self.develop_bridge_iface+ "'/>"
639 if windows_os:
640 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
641 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
642 else:
643 text += self.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
644 if v.get('mac_address', None) != None:
645 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
646 text += self.pci2xml(v.get('vpci',None))
647 text += self.dec_tab()+'</interface>'
648 continue
649
650 if v['dedicated'] == 'yes': #passthrought
651 text += self.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
652 self.inc_tab() + "<source>"
653 self.inc_tab()
654 text += self.pci2xml(v['source'])
655 text += self.dec_tab()+'</source>'
656 text += self.pci2xml(v.get('vpci',None))
657 if windows_os:
658 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
659 text += self.dec_tab()+'</hostdev>'
660 net_nb += 1
661 else: #sriov_interfaces
662 #skip not connected interfaces
663 if v.get("net_id") == None:
664 continue
665 text += self.tab() + "<interface type='hostdev' managed='yes'>"
666 self.inc_tab()
667 if v.get('mac_address', None) != None:
668 text+= self.tab() + "<mac address='" +v['mac_address']+ "'/>"
669 text+= self.tab()+'<source>'
670 self.inc_tab()
671 text += self.pci2xml(v['source'])
672 text += self.dec_tab()+'</source>'
673 if v.get('vlan',None) != None:
674 text += self.tab() + "<vlan> <tag id='" + str(v['vlan']) + "'/> </vlan>"
675 text += self.pci2xml(v.get('vpci',None))
676 if windows_os:
677 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
678 text += self.dec_tab()+'</interface>'
679
680
681 text += self.dec_tab()+'</devices>'+\
682 self.dec_tab()+'</domain>'
683 return 0, text
684
685 def pci2xml(self, pci):
686 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
687 alows an empty pci text'''
688 if pci is None:
689 return ""
690 first_part = pci.split(':')
691 second_part = first_part[2].split('.')
692 return self.tab() + "<address type='pci' domain='0x" + first_part[0] + \
693 "' bus='0x" + first_part[1] + "' slot='0x" + second_part[0] + \
694 "' function='0x" + second_part[1] + "'/>"
695
696 def tab(self):
697 """Return indentation according to xml_level"""
698 return "\n" + (' '*self.xml_level)
699
700 def inc_tab(self):
701 """Increment and return indentation according to xml_level"""
702 self.xml_level += 1
703 return self.tab()
704
705 def dec_tab(self):
706 """Decrement and return indentation according to xml_level"""
707 self.xml_level -= 1
708 return self.tab()
709
710 def create_ovs_bridge(self):
711 """
712 Create a bridge in compute OVS to allocate VMs
713 :return: True if success
714 """
715 command = 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
716 print self.name, ': command:', command
717 (_, stdout, _) = self.ssh_conn.exec_command(command)
718 content = stdout.read()
719 if len(content) == 0:
720 return True
721 else:
722 return False
723
724 def delete_port_to_ovs_bridge(self, vlan, net_uuid):
725 """
726 Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed
727 :param vlan: vlan port id
728 :param net_uuid: network id
729 :return:
730 """
731
732 command = 'sudo ovs-vsctl del-port br-int ovim-' + vlan
733 print self.name, ': command:', command
734 (_, stdout, _) = self.ssh_conn.exec_command(command)
735 content = stdout.read()
736 if len(content) == 0:
737 return True
738 else:
739 return False
740
741 def is_port_free(self, vlan, net_uuid):
742 """
743 Check if por is free before delete from the compute.
744 :param vlan: vlan port id
745 :param net_uuid: network id
746 :return: True if is not free
747 """
748 self.db_lock.acquire()
749 result, content = self.db.get_table(
750 FROM='ports as p join instances as i on p.instance_id=i.uuid',
751 WHERE={"i.host_id": self.host_id, 'p.type': 'instance:bridge', 'p.net_id': net_uuid}
752 )
753 self.db_lock.release()
754
755 if content > 0:
756 return False
757 else:
758 return True
759
760 def add_port_to_ovs_bridge(self, vlan):
761 """
762 Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge
763 :param vlan: vlan port id
764 :return:
765 """
766 command = 'sudo ovs-vsctl add-port br-int ovim-' + vlan + ' tag=' + vlan
767 print self.name, ': command:', command
768 (_, stdout, _) = self.ssh_conn.exec_command(command)
769 content = stdout.read()
770 if len(content) == 0:
771 return True
772 else:
773 return False
774
775 def delete_bridge_port_attached_to_ovs(self, vlan, net_uuid):
776 """
777 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
778 :param vlan:
779 :param net_uuid:
780 :return: True if success
781 """
782 if not self.is_port_free(vlan, net_uuid):
783 return True
784 self.delete_port_to_ovs_bridge(vlan, net_uuid)
785 self.delete_linux_bridge(vlan)
786 return True
787
788 def delete_linux_bridge(self, vlan):
789 """
790 Delete a linux bridge in a scpecific compute.
791 :param vlan: vlan port id
792 :return: True if success
793 """
794 command = 'sudo ifconfig ovim-' + vlan + ' down && sudo brctl delbr ovim-' + vlan
795 print self.name, ': command:', command
796 (_, stdout, _) = self.ssh_conn.exec_command(command)
797 content = stdout.read()
798 if len(content) == 0:
799 return True
800 else:
801 return False
802
803 def create_ovs_bridge_port(self, vlan):
804 """
805 Generate a linux bridge and attache the port to a OVS bridge
806 :param vlan: vlan port id
807 :return:
808 """
809 self.create_linux_bridge(vlan)
810 self.add_port_to_ovs_bridge(vlan)
811
812 def create_linux_bridge(self, vlan):
813 """
814 Create a linux bridge with STP active
815 :param vlan: netowrk vlan id
816 :return:
817 """
818 command = 'sudo brctl addbr ovim-' + vlan + ' && sudo ifconfig ovim-' + vlan + ' up'
819 print self.name, ': command:', command
820 (_, stdout, _) = self.ssh_conn.exec_command(command)
821 content = stdout.read()
822
823 if len(content) != 0:
824 return False
825
826 command = 'sudo brctl stp ovim-' + vlan + ' on'
827 print self.name, ': command:', command
828 (_, stdout, _) = self.ssh_conn.exec_command(command)
829 content = stdout.read()
830
831 if len(content) == 0:
832 return True
833 else:
834 return False
835
836 def create_ovs_vxlan_tunnel(self, vxlan_interface, remote_ip):
837 """
838 Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level
839 :param vxlan_interface: vlxan inteface name.
840 :param remote_ip: tunnel endpoint remote compute ip.
841 :return:
842 """
843 command = 'sudo ovs-vsctl add-port br-int ' + vxlan_interface + \
844 ' -- set Interface ' + vxlan_interface + ' type=vxlan options:remote_ip=' + remote_ip + \
845 ' -- set Port ' + vxlan_interface + ' other_config:stp-path-cost=10'
846 print self.name, ': command:', command
847 (_, stdout, _) = self.ssh_conn.exec_command(command)
848 content = stdout.read()
849 print content
850 if len(content) == 0:
851 return True
852 else:
853 return False
854
855 def delete_ovs_vxlan_tunnel(self, vxlan_interface):
856 """
857 Delete a vlxan tunnel port from a OVS brdige.
858 :param vxlan_interface: vlxan name to be delete it.
859 :return: True if success.
860 """
861 command = 'sudo ovs-vsctl del-port br-int ' + vxlan_interface
862 print self.name, ': command:', command
863 (_, stdout, _) = self.ssh_conn.exec_command(command)
864 content = stdout.read()
865 print content
866 if len(content) == 0:
867 return True
868 else:
869 return False
870
871 def delete_ovs_bridge(self):
872 """
873 Delete a OVS bridge from a compute.
874 :return: True if success
875 """
876 command = 'sudo ovs-vsctl del-br br-int'
877 print self.name, ': command:', command
878 (_, stdout, _) = self.ssh_conn.exec_command(command)
879 content = stdout.read()
880 if len(content) == 0:
881 return True
882 else:
883 return False
884
885 def get_file_info(self, path):
886 command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
887 print self.name, ': command:', command
888 (_, stdout, _) = self.ssh_conn.exec_command(command)
889 content = stdout.read()
890 if len(content) == 0:
891 return None # file does not exist
892 else:
893 return content.split(" ") #(permission, 1, owner, group, size, date, file)
894
895 def qemu_get_info(self, path):
896 command = 'qemu-img info ' + path
897 print self.name, ': command:', command
898 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
899 content = stdout.read()
900 if len(content) == 0:
901 error = stderr.read()
902 print self.name, ": get_qemu_info error ", error
903 raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error)
904 else:
905 try:
906 return yaml.load(content)
907 except yaml.YAMLError as exc:
908 text = ""
909 if hasattr(exc, 'problem_mark'):
910 mark = exc.problem_mark
911 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
912 print self.name, ": get_qemu_info yaml format Exception", text
913 raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text)
914
915 def qemu_change_backing(self, inc_file, new_backing_file):
916 command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file
917 print self.name, ': command:', command
918 (_, _, stderr) = self.ssh_conn.exec_command(command)
919 content = stderr.read()
920 if len(content) == 0:
921 return 0
922 else:
923 print self.name, ": qemu_change_backing error: ", content
924 return -1
925
926 def get_notused_filename(self, proposed_name, suffix=''):
927 '''Look for a non existing file_name in the host
928 proposed_name: proposed file name, includes path
929 suffix: suffix to be added to the name, before the extention
930 '''
931 extension = proposed_name.rfind(".")
932 slash = proposed_name.rfind("/")
933 if extension < 0 or extension < slash: # no extension
934 extension = len(proposed_name)
935 target_name = proposed_name[:extension] + suffix + proposed_name[extension:]
936 info = self.get_file_info(target_name)
937 if info is None:
938 return target_name
939
940 index=0
941 while info is not None:
942 target_name = proposed_name[:extension] + suffix + "-" + str(index) + proposed_name[extension:]
943 index+=1
944 info = self.get_file_info(target_name)
945 return target_name
946
947 def get_notused_path(self, proposed_path, suffix=''):
948 '''Look for a non existing path at database for images
949 proposed_path: proposed file name, includes path
950 suffix: suffix to be added to the name, before the extention
951 '''
952 extension = proposed_path.rfind(".")
953 if extension < 0:
954 extension = len(proposed_path)
955 if suffix != None:
956 target_path = proposed_path[:extension] + suffix + proposed_path[extension:]
957 index=0
958 while True:
959 r,_=self.db.get_table(FROM="images",WHERE={"path":target_path})
960 if r<=0:
961 return target_path
962 target_path = proposed_path[:extension] + suffix + "-" + str(index) + proposed_path[extension:]
963 index+=1
964
965
966 def delete_file(self, file_name):
967 command = 'rm -f '+file_name
968 print self.name, ': command:', command
969 (_, _, stderr) = self.ssh_conn.exec_command(command)
970 error_msg = stderr.read()
971 if len(error_msg) > 0:
972 raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg)
973
974 def copy_file(self, source, destination, perserve_time=True):
975 if source[0:4]=="http":
976 command = "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
977 dst=destination, src=source, dst_result=destination + ".result" )
978 else:
979 command = 'cp --no-preserve=mode'
980 if perserve_time:
981 command += ' --preserve=timestamps'
982 command += " '{}' '{}'".format(source, destination)
983 print self.name, ': command:', command
984 (_, _, stderr) = self.ssh_conn.exec_command(command)
985 error_msg = stderr.read()
986 if len(error_msg) > 0:
987 raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg)
988
989 def copy_remote_file(self, remote_file, use_incremental):
990 ''' Copy a file from the repository to local folder and recursively
991 copy the backing files in case the remote file is incremental
992 Read and/or modified self.localinfo['files'] that contain the
993 unmodified copies of images in the local path
994 params:
995 remote_file: path of remote file
996 use_incremental: None (leave the decision to this function), True, False
997 return:
998 local_file: name of local file
999 qemu_info: dict with quemu information of local file
1000 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
1001 '''
1002
1003 use_incremental_out = use_incremental
1004 new_backing_file = None
1005 local_file = None
1006 file_from_local = True
1007
1008 #in case incremental use is not decided, take the decision depending on the image
1009 #avoid the use of incremental if this image is already incremental
1010 if remote_file[0:4] == "http":
1011 file_from_local = False
1012 if file_from_local:
1013 qemu_remote_info = self.qemu_get_info(remote_file)
1014 if use_incremental_out==None:
1015 use_incremental_out = not ( file_from_local and 'backing file' in qemu_remote_info)
1016 #copy recursivelly the backing files
1017 if file_from_local and 'backing file' in qemu_remote_info:
1018 new_backing_file, _, _ = self.copy_remote_file(qemu_remote_info['backing file'], True)
1019
1020 #check if remote file is present locally
1021 if use_incremental_out and remote_file in self.localinfo['files']:
1022 local_file = self.localinfo['files'][remote_file]
1023 local_file_info = self.get_file_info(local_file)
1024 if file_from_local:
1025 remote_file_info = self.get_file_info(remote_file)
1026 if local_file_info == None:
1027 local_file = None
1028 elif file_from_local and (local_file_info[4]!=remote_file_info[4] or local_file_info[5]!=remote_file_info[5]):
1029 #local copy of file not valid because date or size are different.
1030 #TODO DELETE local file if this file is not used by any active virtual machine
1031 try:
1032 self.delete_file(local_file)
1033 del self.localinfo['files'][remote_file]
1034 except Exception:
1035 pass
1036 local_file = None
1037 else: #check that the local file has the same backing file, or there are not backing at all
1038 qemu_info = self.qemu_get_info(local_file)
1039 if new_backing_file != qemu_info.get('backing file'):
1040 local_file = None
1041
1042
1043 if local_file == None: #copy the file
1044 img_name= remote_file.split('/') [-1]
1045 img_local = self.image_path + '/' + img_name
1046 local_file = self.get_notused_filename(img_local)
1047 self.copy_file(remote_file, local_file, use_incremental_out)
1048
1049 if use_incremental_out:
1050 self.localinfo['files'][remote_file] = local_file
1051 if new_backing_file:
1052 self.qemu_change_backing(local_file, new_backing_file)
1053 qemu_info = self.qemu_get_info(local_file)
1054
1055 return local_file, qemu_info, use_incremental_out
1056
1057 def launch_server(self, conn, server, rebuild=False, domain=None):
1058 if self.test:
1059 time.sleep(random.randint(20,150)) #sleep random timeto be make it a bit more real
1060 return 0, 'Success'
1061
1062 server_id = server['uuid']
1063 paused = server.get('paused','no')
1064 try:
1065 if domain!=None and rebuild==False:
1066 domain.resume()
1067 #self.server_status[server_id] = 'ACTIVE'
1068 return 0, 'Success'
1069
1070 self.db_lock.acquire()
1071 result, server_data = self.db.get_instance(server_id)
1072 self.db_lock.release()
1073 if result <= 0:
1074 print self.name, ": launch_server ERROR getting server from DB",result, server_data
1075 return result, server_data
1076
1077 #0: get image metadata
1078 server_metadata = server.get('metadata', {})
1079 use_incremental = None
1080
1081 if "use_incremental" in server_metadata:
1082 use_incremental = False if server_metadata["use_incremental"]=="no" else True
1083
1084 server_host_files = self.localinfo['server_files'].get( server['uuid'], {})
1085 if rebuild:
1086 #delete previous incremental files
1087 for file_ in server_host_files.values():
1088 self.delete_file(file_['source file'] )
1089 server_host_files={}
1090
1091 #1: obtain aditional devices (disks)
1092 #Put as first device the main disk
1093 devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ]
1094 if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']:
1095 devices += server_data['extended']['devices']
1096
1097 for dev in devices:
1098 if dev['image_id'] == None:
1099 continue
1100
1101 self.db_lock.acquire()
1102 result, content = self.db.get_table(FROM='images', SELECT=('path','metadata'),WHERE={'uuid':dev['image_id']} )
1103 self.db_lock.release()
1104 if result <= 0:
1105 error_text = "ERROR", result, content, "when getting image", dev['image_id']
1106 print self.name, ": launch_server", error_text
1107 return -1, error_text
1108 if content[0]['metadata'] is not None:
1109 dev['metadata'] = json.loads(content[0]['metadata'])
1110 else:
1111 dev['metadata'] = {}
1112
1113 if dev['image_id'] in server_host_files:
1114 dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path
1115 dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2
1116 continue
1117
1118 #2: copy image to host
1119 remote_file = content[0]['path']
1120 use_incremental_image = use_incremental
1121 if dev['metadata'].get("use_incremental") == "no":
1122 use_incremental_image = False
1123 local_file, qemu_info, use_incremental_image = self.copy_remote_file(remote_file, use_incremental_image)
1124
1125 #create incremental image
1126 if use_incremental_image:
1127 local_file_inc = self.get_notused_filename(local_file, '.inc')
1128 command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file
1129 print 'command:', command
1130 (_, _, stderr) = self.ssh_conn.exec_command(command)
1131 error_msg = stderr.read()
1132 if len(error_msg) > 0:
1133 raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg)
1134 local_file = local_file_inc
1135 qemu_info = {'file format':'qcow2'}
1136
1137 server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']}
1138
1139 dev['source file'] = local_file
1140 dev['file format'] = qemu_info['file format']
1141
1142 self.localinfo['server_files'][ server['uuid'] ] = server_host_files
1143 self.localinfo_dirty = True
1144
1145 #3 Create XML
1146 result, xml = self.create_xml_server(server_data, devices, server_metadata) #local_file
1147 if result <0:
1148 print self.name, ": create xml server error:", xml
1149 return -2, xml
1150 print self.name, ": create xml:", xml
1151 atribute = host_thread.lvirt_module.VIR_DOMAIN_START_PAUSED if paused == "yes" else 0
1152 #4 Start the domain
1153 if not rebuild: #ensures that any pending destroying server is done
1154 self.server_forceoff(True)
1155 #print self.name, ": launching instance" #, xml
1156 conn.createXML(xml, atribute)
1157 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
1158
1159 return 0, 'Success'
1160
1161 except paramiko.ssh_exception.SSHException as e:
1162 text = e.args[0]
1163 print self.name, ": launch_server(%s) ssh Exception: %s" %(server_id, text)
1164 if "SSH session not active" in text:
1165 self.ssh_connect()
1166 except host_thread.lvirt_module.libvirtError as e:
1167 text = e.get_error_message()
1168 print self.name, ": launch_server(%s) libvirt Exception: %s" %(server_id, text)
1169 except Exception as e:
1170 text = str(e)
1171 print self.name, ": launch_server(%s) Exception: %s" %(server_id, text)
1172 return -1, text
1173
1174 def update_servers_status(self):
1175 # # virDomainState
1176 # VIR_DOMAIN_NOSTATE = 0
1177 # VIR_DOMAIN_RUNNING = 1
1178 # VIR_DOMAIN_BLOCKED = 2
1179 # VIR_DOMAIN_PAUSED = 3
1180 # VIR_DOMAIN_SHUTDOWN = 4
1181 # VIR_DOMAIN_SHUTOFF = 5
1182 # VIR_DOMAIN_CRASHED = 6
1183 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
1184
1185 if self.test or len(self.server_status)==0:
1186 return
1187
1188 try:
1189 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1190 domains= conn.listAllDomains()
1191 domain_dict={}
1192 for domain in domains:
1193 uuid = domain.UUIDString() ;
1194 libvirt_status = domain.state()
1195 #print libvirt_status
1196 if libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_RUNNING or libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTDOWN:
1197 new_status = "ACTIVE"
1198 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_PAUSED:
1199 new_status = "PAUSED"
1200 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTOFF:
1201 new_status = "INACTIVE"
1202 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_CRASHED:
1203 new_status = "ERROR"
1204 else:
1205 new_status = None
1206 domain_dict[uuid] = new_status
1207 conn.close()
1208 except host_thread.lvirt_module.libvirtError as e:
1209 print self.name, ": get_state() Exception '", e.get_error_message()
1210 return
1211
1212 for server_id, current_status in self.server_status.iteritems():
1213 new_status = None
1214 if server_id in domain_dict:
1215 new_status = domain_dict[server_id]
1216 else:
1217 new_status = "INACTIVE"
1218
1219 if new_status == None or new_status == current_status:
1220 continue
1221 if new_status == 'INACTIVE' and current_status == 'ERROR':
1222 continue #keep ERROR status, because obviously this machine is not running
1223 #change status
1224 print self.name, ": server ", server_id, "status change from ", current_status, "to", new_status
1225 STATUS={'progress':100, 'status':new_status}
1226 if new_status == 'ERROR':
1227 STATUS['last_error'] = 'machine has crashed'
1228 self.db_lock.acquire()
1229 r,_ = self.db.update_rows('instances', STATUS, {'uuid':server_id}, log=False)
1230 self.db_lock.release()
1231 if r>=0:
1232 self.server_status[server_id] = new_status
1233
1234 def action_on_server(self, req, last_retry=True):
1235 '''Perform an action on a req
1236 Attributes:
1237 req: dictionary that contain:
1238 server properties: 'uuid','name','tenant_id','status'
1239 action: 'action'
1240 host properties: 'user', 'ip_name'
1241 return (error, text)
1242 0: No error. VM is updated to new state,
1243 -1: Invalid action, as trying to pause a PAUSED VM
1244 -2: Error accessing host
1245 -3: VM nor present
1246 -4: Error at DB access
1247 -5: Error while trying to perform action. VM is updated to ERROR
1248 '''
1249 server_id = req['uuid']
1250 conn = None
1251 new_status = None
1252 old_status = req['status']
1253 last_error = None
1254
1255 if self.test:
1256 if 'terminate' in req['action']:
1257 new_status = 'deleted'
1258 elif 'shutoff' in req['action'] or 'shutdown' in req['action'] or 'forceOff' in req['action']:
1259 if req['status']!='ERROR':
1260 time.sleep(5)
1261 new_status = 'INACTIVE'
1262 elif 'start' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1263 elif 'resume' in req['action'] and req['status']!='ERROR' and req['status']!='INACTIVE' : new_status = 'ACTIVE'
1264 elif 'pause' in req['action'] and req['status']!='ERROR': new_status = 'PAUSED'
1265 elif 'reboot' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1266 elif 'rebuild' in req['action']:
1267 time.sleep(random.randint(20,150))
1268 new_status = 'ACTIVE'
1269 elif 'createImage' in req['action']:
1270 time.sleep(5)
1271 self.create_image(None, req)
1272 else:
1273 try:
1274 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1275 try:
1276 dom = conn.lookupByUUIDString(server_id)
1277 except host_thread.lvirt_module.libvirtError as e:
1278 text = e.get_error_message()
1279 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1280 dom = None
1281 else:
1282 print self.name, ": action_on_server(",server_id,") libvirt exception:", text
1283 raise e
1284
1285 if 'forceOff' in req['action']:
1286 if dom == None:
1287 print self.name, ": action_on_server(",server_id,") domain not running"
1288 else:
1289 try:
1290 print self.name, ": sending DESTROY to server", server_id
1291 dom.destroy()
1292 except Exception as e:
1293 if "domain is not running" not in e.get_error_message():
1294 print self.name, ": action_on_server(",server_id,") Exception while sending force off:", e.get_error_message()
1295 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1296 new_status = 'ERROR'
1297
1298 elif 'terminate' in req['action']:
1299 if dom == None:
1300 print self.name, ": action_on_server(",server_id,") domain not running"
1301 new_status = 'deleted'
1302 else:
1303 try:
1304 if req['action']['terminate'] == 'force':
1305 print self.name, ": sending DESTROY to server", server_id
1306 dom.destroy()
1307 new_status = 'deleted'
1308 else:
1309 print self.name, ": sending SHUTDOWN to server", server_id
1310 dom.shutdown()
1311 self.pending_terminate_server.append( (time.time()+10,server_id) )
1312 except Exception as e:
1313 print self.name, ": action_on_server(",server_id,") Exception while destroy:", e.get_error_message()
1314 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1315 new_status = 'ERROR'
1316 if "domain is not running" in e.get_error_message():
1317 try:
1318 dom.undefine()
1319 new_status = 'deleted'
1320 except Exception:
1321 print self.name, ": action_on_server(",server_id,") Exception while undefine:", e.get_error_message()
1322 last_error = 'action_on_server Exception2 while undefine:', e.get_error_message()
1323 #Exception: 'virDomainDetachDevice() failed'
1324 if new_status=='deleted':
1325 if server_id in self.server_status:
1326 del self.server_status[server_id]
1327 if req['uuid'] in self.localinfo['server_files']:
1328 for file_ in self.localinfo['server_files'][ req['uuid'] ].values():
1329 try:
1330 self.delete_file(file_['source file'])
1331 except Exception:
1332 pass
1333 del self.localinfo['server_files'][ req['uuid'] ]
1334 self.localinfo_dirty = True
1335
1336 elif 'shutoff' in req['action'] or 'shutdown' in req['action']:
1337 try:
1338 if dom == None:
1339 print self.name, ": action_on_server(",server_id,") domain not running"
1340 else:
1341 dom.shutdown()
1342 # new_status = 'INACTIVE'
1343 #TODO: check status for changing at database
1344 except Exception as e:
1345 new_status = 'ERROR'
1346 print self.name, ": action_on_server(",server_id,") Exception while shutdown:", e.get_error_message()
1347 last_error = 'action_on_server Exception while shutdown: ' + e.get_error_message()
1348
1349 elif 'rebuild' in req['action']:
1350 if dom != None:
1351 dom.destroy()
1352 r = self.launch_server(conn, req, True, None)
1353 if r[0] <0:
1354 new_status = 'ERROR'
1355 last_error = r[1]
1356 else:
1357 new_status = 'ACTIVE'
1358 elif 'start' in req['action']:
1359 # The instance is only create in DB but not yet at libvirt domain, needs to be create
1360 rebuild = True if req['action']['start'] == 'rebuild' else False
1361 r = self.launch_server(conn, req, rebuild, dom)
1362 if r[0] <0:
1363 new_status = 'ERROR'
1364 last_error = r[1]
1365 else:
1366 new_status = 'ACTIVE'
1367
1368 elif 'resume' in req['action']:
1369 try:
1370 if dom == None:
1371 pass
1372 else:
1373 dom.resume()
1374 # new_status = 'ACTIVE'
1375 except Exception as e:
1376 print self.name, ": action_on_server(",server_id,") Exception while resume:", e.get_error_message()
1377
1378 elif 'pause' in req['action']:
1379 try:
1380 if dom == None:
1381 pass
1382 else:
1383 dom.suspend()
1384 # new_status = 'PAUSED'
1385 except Exception as e:
1386 print self.name, ": action_on_server(",server_id,") Exception while pause:", e.get_error_message()
1387
1388 elif 'reboot' in req['action']:
1389 try:
1390 if dom == None:
1391 pass
1392 else:
1393 dom.reboot()
1394 print self.name, ": action_on_server(",server_id,") reboot:"
1395 #new_status = 'ACTIVE'
1396 except Exception as e:
1397 print self.name, ": action_on_server(",server_id,") Exception while reboot:", e.get_error_message()
1398 elif 'createImage' in req['action']:
1399 self.create_image(dom, req)
1400
1401
1402 conn.close()
1403 except host_thread.lvirt_module.libvirtError as e:
1404 if conn is not None: conn.close()
1405 text = e.get_error_message()
1406 new_status = "ERROR"
1407 last_error = text
1408 print self.name, ": action_on_server(",server_id,") Exception '", text
1409 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1410 print self.name, ": action_on_server(",server_id,") Exception removed from host"
1411 #end of if self.test
1412 if new_status == None:
1413 return 1
1414
1415 print self.name, ": action_on_server(",server_id,") new status", new_status, last_error
1416 UPDATE = {'progress':100, 'status':new_status}
1417
1418 if new_status=='ERROR':
1419 if not last_retry: #if there will be another retry do not update database
1420 return -1
1421 elif 'terminate' in req['action']:
1422 #PUT a log in the database
1423 print self.name, ": PANIC deleting server", server_id, last_error
1424 self.db_lock.acquire()
1425 self.db.new_row('logs',
1426 {'uuid':server_id, 'tenant_id':req['tenant_id'], 'related':'instances','level':'panic',
1427 'description':'PANIC deleting server from host '+self.name+': '+last_error}
1428 )
1429 self.db_lock.release()
1430 if server_id in self.server_status:
1431 del self.server_status[server_id]
1432 return -1
1433 else:
1434 UPDATE['last_error'] = last_error
1435 if new_status != 'deleted' and (new_status != old_status or new_status == 'ERROR') :
1436 self.db_lock.acquire()
1437 self.db.update_rows('instances', UPDATE, {'uuid':server_id}, log=True)
1438 self.server_status[server_id] = new_status
1439 self.db_lock.release()
1440 if new_status == 'ERROR':
1441 return -1
1442 return 1
1443
1444
1445 def restore_iface(self, name, mac, lib_conn=None):
1446 ''' make an ifdown, ifup to restore default parameter of na interface
1447 Params:
1448 mac: mac address of the interface
1449 lib_conn: connection to the libvirt, if None a new connection is created
1450 Return 0,None if ok, -1,text if fails
1451 '''
1452 conn=None
1453 ret = 0
1454 error_text=None
1455 if self.test:
1456 print self.name, ": restore_iface '%s' %s" % (name, mac)
1457 return 0, None
1458 try:
1459 if not lib_conn:
1460 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1461 else:
1462 conn = lib_conn
1463
1464 #wait to the pending VM deletion
1465 #TODO.Revise self.server_forceoff(True)
1466
1467 iface = conn.interfaceLookupByMACString(mac)
1468 iface.destroy()
1469 iface.create()
1470 print self.name, ": restore_iface '%s' %s" % (name, mac)
1471 except host_thread.lvirt_module.libvirtError as e:
1472 error_text = e.get_error_message()
1473 print self.name, ": restore_iface '%s' '%s' libvirt exception: %s" %(name, mac, error_text)
1474 ret=-1
1475 finally:
1476 if lib_conn is None and conn is not None:
1477 conn.close()
1478 return ret, error_text
1479
1480
1481 def create_image(self,dom, req):
1482 if self.test:
1483 if 'path' in req['action']['createImage']:
1484 file_dst = req['action']['createImage']['path']
1485 else:
1486 createImage=req['action']['createImage']
1487 img_name= createImage['source']['path']
1488 index=img_name.rfind('/')
1489 file_dst = self.get_notused_path(img_name[:index+1] + createImage['name'] + '.qcow2')
1490 image_status='ACTIVE'
1491 else:
1492 for retry in (0,1):
1493 try:
1494 server_id = req['uuid']
1495 createImage=req['action']['createImage']
1496 file_orig = self.localinfo['server_files'][server_id] [ createImage['source']['image_id'] ] ['source file']
1497 if 'path' in req['action']['createImage']:
1498 file_dst = req['action']['createImage']['path']
1499 else:
1500 img_name= createImage['source']['path']
1501 index=img_name.rfind('/')
1502 file_dst = self.get_notused_filename(img_name[:index+1] + createImage['name'] + '.qcow2')
1503
1504 self.copy_file(file_orig, file_dst)
1505 qemu_info = self.qemu_get_info(file_orig)
1506 if 'backing file' in qemu_info:
1507 for k,v in self.localinfo['files'].items():
1508 if v==qemu_info['backing file']:
1509 self.qemu_change_backing(file_dst, k)
1510 break
1511 image_status='ACTIVE'
1512 break
1513 except paramiko.ssh_exception.SSHException as e:
1514 image_status='ERROR'
1515 error_text = e.args[0]
1516 print self.name, "': create_image(",server_id,") ssh Exception:", error_text
1517 if "SSH session not active" in error_text and retry==0:
1518 self.ssh_connect()
1519 except Exception as e:
1520 image_status='ERROR'
1521 error_text = str(e)
1522 print self.name, "': create_image(",server_id,") Exception:", error_text
1523
1524 #TODO insert a last_error at database
1525 self.db_lock.acquire()
1526 self.db.update_rows('images', {'status':image_status, 'progress': 100, 'path':file_dst},
1527 {'uuid':req['new_image']['uuid']}, log=True)
1528 self.db_lock.release()
1529
1530 def edit_iface(self, port_id, old_net, new_net):
1531 #This action imply remove and insert interface to put proper parameters
1532 if self.test:
1533 time.sleep(1)
1534 else:
1535 #get iface details
1536 self.db_lock.acquire()
1537 r,c = self.db.get_table(FROM='ports as p join resources_port as rp on p.uuid=rp.port_id',
1538 WHERE={'port_id': port_id})
1539 self.db_lock.release()
1540 if r<0:
1541 print self.name, ": edit_iface(",port_id,") DDBB error:", c
1542 return
1543 elif r==0:
1544 print self.name, ": edit_iface(",port_id,") por not found"
1545 return
1546 port=c[0]
1547 if port["model"]!="VF":
1548 print self.name, ": edit_iface(",port_id,") ERROR model must be VF"
1549 return
1550 #create xml detach file
1551 xml=[]
1552 self.xml_level = 2
1553 xml.append("<interface type='hostdev' managed='yes'>")
1554 xml.append(" <mac address='" +port['mac']+ "'/>")
1555 xml.append(" <source>"+ self.pci2xml(port['pci'])+"\n </source>")
1556 xml.append('</interface>')
1557
1558
1559 try:
1560 conn=None
1561 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1562 dom = conn.lookupByUUIDString(port["instance_id"])
1563 if old_net:
1564 text="\n".join(xml)
1565 print self.name, ": edit_iface detaching SRIOV interface", text
1566 dom.detachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
1567 if new_net:
1568 xml[-1] =" <vlan> <tag id='" + str(port['vlan']) + "'/> </vlan>"
1569 self.xml_level = 1
1570 xml.append(self.pci2xml(port.get('vpci',None)) )
1571 xml.append('</interface>')
1572 text="\n".join(xml)
1573 print self.name, ": edit_iface attaching SRIOV interface", text
1574 dom.attachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
1575
1576 except host_thread.lvirt_module.libvirtError as e:
1577 text = e.get_error_message()
1578 print self.name, ": edit_iface(",port["instance_id"],") libvirt exception:", text
1579
1580 finally:
1581 if conn is not None: conn.close()
1582
1583
1584 def create_server(server, db, db_lock, only_of_ports):
1585 #print "server"
1586 #print "server"
1587 #print server
1588 #print "server"
1589 #print "server"
1590 #try:
1591 # host_id = server.get('host_id', None)
1592 extended = server.get('extended', None)
1593
1594 # print '----------------------'
1595 # print json.dumps(extended, indent=4)
1596
1597 requirements={}
1598 requirements['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
1599 requirements['ram'] = server['flavor'].get('ram', 0)
1600 if requirements['ram']== None:
1601 requirements['ram'] = 0
1602 requirements['vcpus'] = server['flavor'].get('vcpus', 0)
1603 if requirements['vcpus']== None:
1604 requirements['vcpus'] = 0
1605 #If extended is not defined get requirements from flavor
1606 if extended is None:
1607 #If extended is defined in flavor convert to dictionary and use it
1608 if 'extended' in server['flavor'] and server['flavor']['extended'] != None:
1609 json_acceptable_string = server['flavor']['extended'].replace("'", "\"")
1610 extended = json.loads(json_acceptable_string)
1611 else:
1612 extended = None
1613 #print json.dumps(extended, indent=4)
1614
1615 #For simplicity only one numa VM are supported in the initial implementation
1616 if extended != None:
1617 numas = extended.get('numas', [])
1618 if len(numas)>1:
1619 return (-2, "Multi-NUMA VMs are not supported yet")
1620 #elif len(numas)<1:
1621 # return (-1, "At least one numa must be specified")
1622
1623 #a for loop is used in order to be ready to multi-NUMA VMs
1624 request = []
1625 for numa in numas:
1626 numa_req = {}
1627 numa_req['memory'] = numa.get('memory', 0)
1628 if 'cores' in numa:
1629 numa_req['proc_req_nb'] = numa['cores'] #number of cores or threads to be reserved
1630 numa_req['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
1631 numa_req['proc_req_list'] = numa.get('cores-id', None) #list of ids to be assigned to the cores or threads
1632 elif 'paired-threads' in numa:
1633 numa_req['proc_req_nb'] = numa['paired-threads']
1634 numa_req['proc_req_type'] = 'paired-threads'
1635 numa_req['proc_req_list'] = numa.get('paired-threads-id', None)
1636 elif 'threads' in numa:
1637 numa_req['proc_req_nb'] = numa['threads']
1638 numa_req['proc_req_type'] = 'threads'
1639 numa_req['proc_req_list'] = numa.get('threads-id', None)
1640 else:
1641 numa_req['proc_req_nb'] = 0 # by default
1642 numa_req['proc_req_type'] = 'threads'
1643
1644
1645
1646 #Generate a list of sriov and another for physical interfaces
1647 interfaces = numa.get('interfaces', [])
1648 sriov_list = []
1649 port_list = []
1650 for iface in interfaces:
1651 iface['bandwidth'] = int(iface['bandwidth'])
1652 if iface['dedicated'][:3]=='yes':
1653 port_list.append(iface)
1654 else:
1655 sriov_list.append(iface)
1656
1657 #Save lists ordered from more restrictive to less bw requirements
1658 numa_req['sriov_list'] = sorted(sriov_list, key=lambda k: k['bandwidth'], reverse=True)
1659 numa_req['port_list'] = sorted(port_list, key=lambda k: k['bandwidth'], reverse=True)
1660
1661
1662 request.append(numa_req)
1663
1664 # print "----------\n"+json.dumps(request[0], indent=4)
1665 # print '----------\n\n'
1666
1667 #Search in db for an appropriate numa for each requested numa
1668 #at the moment multi-NUMA VMs are not supported
1669 if len(request)>0:
1670 requirements['numa'].update(request[0])
1671 if requirements['numa']['memory']>0:
1672 requirements['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
1673 elif requirements['ram']==0:
1674 return (-1, "Memory information not set neither at extended field not at ram")
1675 if requirements['numa']['proc_req_nb']>0:
1676 requirements['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
1677 elif requirements['vcpus']==0:
1678 return (-1, "Processor information not set neither at extended field not at vcpus")
1679
1680
1681 db_lock.acquire()
1682 result, content = db.get_numas(requirements, server.get('host_id', None), only_of_ports)
1683 db_lock.release()
1684
1685 if result == -1:
1686 return (-1, content)
1687
1688 numa_id = content['numa_id']
1689 host_id = content['host_id']
1690
1691 #obtain threads_id and calculate pinning
1692 cpu_pinning = []
1693 reserved_threads=[]
1694 if requirements['numa']['proc_req_nb']>0:
1695 db_lock.acquire()
1696 result, content = db.get_table(FROM='resources_core',
1697 SELECT=('id','core_id','thread_id'),
1698 WHERE={'numa_id':numa_id,'instance_id': None, 'status':'ok'} )
1699 db_lock.release()
1700 if result <= 0:
1701 print content
1702 return -1, content
1703
1704 #convert rows to a dictionary indexed by core_id
1705 cores_dict = {}
1706 for row in content:
1707 if not row['core_id'] in cores_dict:
1708 cores_dict[row['core_id']] = []
1709 cores_dict[row['core_id']].append([row['thread_id'],row['id']])
1710
1711 #In case full cores are requested
1712 paired = 'N'
1713 if requirements['numa']['proc_req_type'] == 'cores':
1714 #Get/create the list of the vcpu_ids
1715 vcpu_id_list = requirements['numa']['proc_req_list']
1716 if vcpu_id_list == None:
1717 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
1718
1719 for threads in cores_dict.itervalues():
1720 #we need full cores
1721 if len(threads) != 2:
1722 continue
1723
1724 #set pinning for the first thread
1725 cpu_pinning.append( [ vcpu_id_list.pop(0), threads[0][0], threads[0][1] ] )
1726
1727 #reserve so it is not used the second thread
1728 reserved_threads.append(threads[1][1])
1729
1730 if len(vcpu_id_list) == 0:
1731 break
1732
1733 #In case paired threads are requested
1734 elif requirements['numa']['proc_req_type'] == 'paired-threads':
1735 paired = 'Y'
1736 #Get/create the list of the vcpu_ids
1737 if requirements['numa']['proc_req_list'] != None:
1738 vcpu_id_list = []
1739 for pair in requirements['numa']['proc_req_list']:
1740 if len(pair)!=2:
1741 return -1, "Field paired-threads-id not properly specified"
1742 return
1743 vcpu_id_list.append(pair[0])
1744 vcpu_id_list.append(pair[1])
1745 else:
1746 vcpu_id_list = range(0,2*int(requirements['numa']['proc_req_nb']))
1747
1748 for threads in cores_dict.itervalues():
1749 #we need full cores
1750 if len(threads) != 2:
1751 continue
1752 #set pinning for the first thread
1753 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
1754
1755 #set pinning for the second thread
1756 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
1757
1758 if len(vcpu_id_list) == 0:
1759 break
1760
1761 #In case normal threads are requested
1762 elif requirements['numa']['proc_req_type'] == 'threads':
1763 #Get/create the list of the vcpu_ids
1764 vcpu_id_list = requirements['numa']['proc_req_list']
1765 if vcpu_id_list == None:
1766 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
1767
1768 for threads_index in sorted(cores_dict, key=lambda k: len(cores_dict[k])):
1769 threads = cores_dict[threads_index]
1770 #set pinning for the first thread
1771 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
1772
1773 #if exists, set pinning for the second thread
1774 if len(threads) == 2 and len(vcpu_id_list) != 0:
1775 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
1776
1777 if len(vcpu_id_list) == 0:
1778 break
1779
1780 #Get the source pci addresses for the selected numa
1781 used_sriov_ports = []
1782 for port in requirements['numa']['sriov_list']:
1783 db_lock.acquire()
1784 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
1785 db_lock.release()
1786 if result <= 0:
1787 print content
1788 return -1, content
1789 for row in content:
1790 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
1791 continue
1792 port['pci'] = row['pci']
1793 if 'mac_address' not in port:
1794 port['mac_address'] = row['mac']
1795 del port['mac']
1796 port['port_id']=row['id']
1797 port['Mbps_used'] = port['bandwidth']
1798 used_sriov_ports.append(row['id'])
1799 break
1800
1801 for port in requirements['numa']['port_list']:
1802 port['Mbps_used'] = None
1803 if port['dedicated'] != "yes:sriov":
1804 port['mac_address'] = port['mac']
1805 del port['mac']
1806 continue
1807 db_lock.acquire()
1808 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac', 'Mbps'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
1809 db_lock.release()
1810 if result <= 0:
1811 print content
1812 return -1, content
1813 port['Mbps_used'] = content[0]['Mbps']
1814 for row in content:
1815 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
1816 continue
1817 port['pci'] = row['pci']
1818 if 'mac_address' not in port:
1819 port['mac_address'] = row['mac'] # mac cannot be set to passthrough ports
1820 del port['mac']
1821 port['port_id']=row['id']
1822 used_sriov_ports.append(row['id'])
1823 break
1824
1825 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
1826 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
1827
1828 server['host_id'] = host_id
1829
1830
1831 #Generate dictionary for saving in db the instance resources
1832 resources = {}
1833 resources['bridged-ifaces'] = []
1834
1835 numa_dict = {}
1836 numa_dict['interfaces'] = []
1837
1838 numa_dict['interfaces'] += requirements['numa']['port_list']
1839 numa_dict['interfaces'] += requirements['numa']['sriov_list']
1840
1841 #Check bridge information
1842 unified_dataplane_iface=[]
1843 unified_dataplane_iface += requirements['numa']['port_list']
1844 unified_dataplane_iface += requirements['numa']['sriov_list']
1845
1846 for control_iface in server.get('networks', []):
1847 control_iface['net_id']=control_iface.pop('uuid')
1848 #Get the brifge name
1849 db_lock.acquire()
1850 result, content = db.get_table(FROM='nets', SELECT=('name','type', 'vlan'),WHERE={'uuid':control_iface['net_id']} )
1851 db_lock.release()
1852 if result < 0:
1853 pass
1854 elif result==0:
1855 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface['net_id']
1856 else:
1857 network=content[0]
1858 if control_iface.get("type", 'virtual') == 'virtual':
1859 if network['type']!='bridge_data' and network['type']!='bridge_man':
1860 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface['net_id']
1861 resources['bridged-ifaces'].append(control_iface)
1862 else:
1863 if network['type']!='data' and network['type']!='ptp':
1864 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface['net_id']
1865 #dataplane interface, look for it in the numa tree and asign this network
1866 iface_found=False
1867 for dataplane_iface in numa_dict['interfaces']:
1868 if dataplane_iface['name'] == control_iface.get("name"):
1869 if (dataplane_iface['dedicated'] == "yes" and control_iface["type"] != "PF") or \
1870 (dataplane_iface['dedicated'] == "no" and control_iface["type"] != "VF") or \
1871 (dataplane_iface['dedicated'] == "yes:sriov" and control_iface["type"] != "VFnotShared") :
1872 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
1873 (control_iface.get("name"), dataplane_iface['dedicated'], control_iface["type"])
1874 dataplane_iface['uuid'] = control_iface['net_id']
1875 if dataplane_iface['dedicated'] == "no":
1876 dataplane_iface['vlan'] = network['vlan']
1877 if dataplane_iface['dedicated'] != "yes" and control_iface.get("mac_address"):
1878 dataplane_iface['mac_address'] = control_iface.get("mac_address")
1879 if control_iface.get("vpci"):
1880 dataplane_iface['vpci'] = control_iface.get("vpci")
1881 iface_found=True
1882 break
1883 if not iface_found:
1884 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface.get("name")
1885
1886 resources['host_id'] = host_id
1887 resources['image_id'] = server['image_id']
1888 resources['flavor_id'] = server['flavor_id']
1889 resources['tenant_id'] = server['tenant_id']
1890 resources['ram'] = requirements['ram']
1891 resources['vcpus'] = requirements['vcpus']
1892 resources['status'] = 'CREATING'
1893
1894 if 'description' in server: resources['description'] = server['description']
1895 if 'name' in server: resources['name'] = server['name']
1896
1897 resources['extended'] = {} #optional
1898 resources['extended']['numas'] = []
1899 numa_dict['numa_id'] = numa_id
1900 numa_dict['memory'] = requirements['numa']['memory']
1901 numa_dict['cores'] = []
1902
1903 for core in cpu_pinning:
1904 numa_dict['cores'].append({'id': core[2], 'vthread': core[0], 'paired': paired})
1905 for core in reserved_threads:
1906 numa_dict['cores'].append({'id': core})
1907 resources['extended']['numas'].append(numa_dict)
1908 if extended!=None and 'devices' in extended: #TODO allow extra devices without numa
1909 resources['extended']['devices'] = extended['devices']
1910
1911
1912 print '===================================={'
1913 print json.dumps(resources, indent=4)
1914 print '====================================}'
1915
1916 return 0, resources
1917