2e02ede0fc41a26dbdbc37caebaead37c18d5631
[osm/openvim.git] / host_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 '''
28 __author__ = "Pablo Montes, Alfonso Tierno, Leonardo Mirabal"
29 __date__ = "$10-jul-2014 12:07:15$"
30
31 import json
32 import yaml
33 import threading
34 import time
35 import Queue
36 import paramiko
37 from jsonschema import validate as js_v, exceptions as js_e
38 #import libvirt
39 import imp
40 from vim_schema import localinfo_schema, hostinfo_schema
41 import random
42 #from logging import Logger
43 #import auxiliary_functions as af
44
45 #TODO: insert a logging system
46
47
48 #global lvirt_module
49 #lvirt_module=None #libvirt module is charged only if not in test mode
50
51 class host_thread(threading.Thread):
52 lvirt_module = None
53 def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode, develop_bridge_iface):
54 '''Init a thread.
55 Arguments:
56 'id' number of thead
57 'name' name of thread
58 'host','user': host ip or name to manage and user
59 'db', 'db_lock': database class and lock to use it in exclusion
60 '''
61 threading.Thread.__init__(self)
62 self.name = name
63 self.host = host
64 self.user = user
65 self.db = db
66 self.db_lock = db_lock
67 self.test = test
68
69 if not test and not host_thread.lvirt_module:
70 try:
71 module_info = imp.find_module("libvirt")
72 host_thread.lvirt_module = imp.load_module("libvirt", *module_info)
73 except (IOError, ImportError) as e:
74 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e))
75
76
77 self.develop_mode = develop_mode
78 self.develop_bridge_iface = develop_bridge_iface
79 self.image_path = image_path
80 self.host_id = host_id
81 self.version = version
82
83 self.xml_level = 0
84 #self.pending ={}
85
86 self.server_status = {} #dictionary with pairs server_uuid:server_status
87 self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
88 self.next_update_server_status = 0 #time when must be check servers status
89
90 self.hostinfo = None
91
92 self.queueLock = threading.Lock()
93 self.taskQueue = Queue.Queue(2000)
94 self.ssh_conn = None
95
96 def ssh_connect(self):
97 try:
98 #Connect SSH
99 self.ssh_conn = paramiko.SSHClient()
100 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
101 self.ssh_conn.load_system_host_keys()
102 self.ssh_conn.connect(self.host, username=self.user, timeout=10) #, None)
103 except paramiko.ssh_exception.SSHException as e:
104 text = e.args[0]
105 print self.name, ": ssh_connect ssh Exception:", text
106
107 def load_localinfo(self):
108 if not self.test:
109 try:
110 #Connect SSH
111 self.ssh_connect()
112
113 command = 'mkdir -p ' + self.image_path
114 #print self.name, ': command:', command
115 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
116 content = stderr.read()
117 if len(content) > 0:
118 print self.name, ': command:', command, "stderr:", content
119
120 command = 'cat ' + self.image_path + '/.openvim.yaml'
121 #print self.name, ': command:', command
122 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
123 content = stdout.read()
124 if len(content) == 0:
125 print self.name, ': command:', command, "stderr:", stderr.read()
126 raise paramiko.ssh_exception.SSHException("Error empty file ")
127 self.localinfo = yaml.load(content)
128 js_v(self.localinfo, localinfo_schema)
129 self.localinfo_dirty=False
130 if 'server_files' not in self.localinfo:
131 self.localinfo['server_files'] = {}
132 print self.name, ': localinfo load from host'
133 return
134
135 except paramiko.ssh_exception.SSHException as e:
136 text = e.args[0]
137 print self.name, ": load_localinfo ssh Exception:", text
138 except host_thread.lvirt_module.libvirtError as e:
139 text = e.get_error_message()
140 print self.name, ": load_localinfo libvirt Exception:", text
141 except yaml.YAMLError as exc:
142 text = ""
143 if hasattr(exc, 'problem_mark'):
144 mark = exc.problem_mark
145 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
146 print self.name, ": load_localinfo yaml format Exception", text
147 except js_e.ValidationError as e:
148 text = ""
149 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
150 print self.name, ": load_localinfo format Exception:", text, e.message
151 except Exception as e:
152 text = str(e)
153 print self.name, ": load_localinfo Exception:", text
154
155 #not loaded, insert a default data and force saving by activating dirty flag
156 self.localinfo = {'files':{}, 'server_files':{} }
157 #self.localinfo_dirty=True
158 self.localinfo_dirty=False
159
160 def load_hostinfo(self):
161 if self.test:
162 return;
163 try:
164 #Connect SSH
165 self.ssh_connect()
166
167
168 command = 'cat ' + self.image_path + '/hostinfo.yaml'
169 #print self.name, ': command:', command
170 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
171 content = stdout.read()
172 if len(content) == 0:
173 print self.name, ': command:', command, "stderr:", stderr.read()
174 raise paramiko.ssh_exception.SSHException("Error empty file ")
175 self.hostinfo = yaml.load(content)
176 js_v(self.hostinfo, hostinfo_schema)
177 print self.name, ': hostlinfo load from host', self.hostinfo
178 return
179
180 except paramiko.ssh_exception.SSHException as e:
181 text = e.args[0]
182 print self.name, ": load_hostinfo ssh Exception:", text
183 except host_thread.lvirt_module.libvirtError as e:
184 text = e.get_error_message()
185 print self.name, ": load_hostinfo libvirt Exception:", text
186 except yaml.YAMLError as exc:
187 text = ""
188 if hasattr(exc, 'problem_mark'):
189 mark = exc.problem_mark
190 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
191 print self.name, ": load_hostinfo yaml format Exception", text
192 except js_e.ValidationError as e:
193 text = ""
194 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
195 print self.name, ": load_hostinfo format Exception:", text, e.message
196 except Exception as e:
197 text = str(e)
198 print self.name, ": load_hostinfo Exception:", text
199
200 #not loaded, insert a default data
201 self.hostinfo = None
202
203 def save_localinfo(self, tries=3):
204 if self.test:
205 self.localinfo_dirty = False
206 return
207
208 while tries>=0:
209 tries-=1
210
211 try:
212 command = 'cat > ' + self.image_path + '/.openvim.yaml'
213 print self.name, ': command:', command
214 (stdin, _, _) = self.ssh_conn.exec_command(command)
215 yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True)
216 self.localinfo_dirty = False
217 break #while tries
218
219 except paramiko.ssh_exception.SSHException as e:
220 text = e.args[0]
221 print self.name, ": save_localinfo ssh Exception:", text
222 if "SSH session not active" in text:
223 self.ssh_connect()
224 except host_thread.lvirt_module.libvirtError as e:
225 text = e.get_error_message()
226 print self.name, ": save_localinfo libvirt Exception:", text
227 except yaml.YAMLError as exc:
228 text = ""
229 if hasattr(exc, 'problem_mark'):
230 mark = exc.problem_mark
231 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
232 print self.name, ": save_localinfo yaml format Exception", text
233 except Exception as e:
234 text = str(e)
235 print self.name, ": save_localinfo Exception:", text
236
237 def load_servers_from_db(self):
238 self.db_lock.acquire()
239 r,c = self.db.get_table(SELECT=('uuid','status', 'image_id'), FROM='instances', WHERE={'host_id': self.host_id})
240 self.db_lock.release()
241
242 self.server_status = {}
243 if r<0:
244 print self.name, ": Error getting data from database:", c
245 return
246 for server in c:
247 self.server_status[ server['uuid'] ] = server['status']
248
249 #convert from old version to new one
250 if 'inc_files' in self.localinfo and server['uuid'] in self.localinfo['inc_files']:
251 server_files_dict = {'source file': self.localinfo['inc_files'][ server['uuid'] ] [0], 'file format':'raw' }
252 if server_files_dict['source file'][-5:] == 'qcow2':
253 server_files_dict['file format'] = 'qcow2'
254
255 self.localinfo['server_files'][ server['uuid'] ] = { server['image_id'] : server_files_dict }
256 if 'inc_files' in self.localinfo:
257 del self.localinfo['inc_files']
258 self.localinfo_dirty = True
259
260 def delete_unused_files(self):
261 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
262 Deletes unused entries at self.loacalinfo and the corresponding local files.
263 The only reason for this mismatch is the manual deletion of instances (VM) at database
264 '''
265 if self.test:
266 return
267 for uuid,images in self.localinfo['server_files'].items():
268 if uuid not in self.server_status:
269 for localfile in images.values():
270 try:
271 print self.name, ": deleting file '%s' of unused server '%s'" %(localfile['source file'], uuid)
272 self.delete_file(localfile['source file'])
273 except paramiko.ssh_exception.SSHException as e:
274 print self.name, ": Exception deleting file '%s': %s" %(localfile['source file'], str(e))
275 del self.localinfo['server_files'][uuid]
276 self.localinfo_dirty = True
277
278 def insert_task(self, task, *aditional):
279 try:
280 self.queueLock.acquire()
281 task = self.taskQueue.put( (task,) + aditional, timeout=5)
282 self.queueLock.release()
283 return 1, None
284 except Queue.Full:
285 return -1, "timeout inserting a task over host " + self.name
286
287 def run(self):
288 while True:
289 self.load_localinfo()
290 self.load_hostinfo()
291 self.load_servers_from_db()
292 self.delete_unused_files()
293 while True:
294 self.queueLock.acquire()
295 if not self.taskQueue.empty():
296 task = self.taskQueue.get()
297 else:
298 task = None
299 self.queueLock.release()
300
301 if task is None:
302 now=time.time()
303 if self.localinfo_dirty:
304 self.save_localinfo()
305 elif self.next_update_server_status < now:
306 self.update_servers_status()
307 self.next_update_server_status = now + 5
308 elif len(self.pending_terminate_server)>0 and self.pending_terminate_server[0][0]<now:
309 self.server_forceoff()
310 else:
311 time.sleep(1)
312 continue
313
314 if task[0] == 'instance':
315 print self.name, ": processing task instance", task[1]['action']
316 retry=0
317 while retry <2:
318 retry += 1
319 r=self.action_on_server(task[1], retry==2)
320 if r>=0:
321 break
322 elif task[0] == 'image':
323 pass
324 elif task[0] == 'exit':
325 print self.name, ": processing task exit"
326 self.terminate()
327 return 0
328 elif task[0] == 'reload':
329 print self.name, ": processing task reload terminating and relaunching"
330 self.terminate()
331 break
332 elif task[0] == 'edit-iface':
333 print self.name, ": processing task edit-iface port=%s, old_net=%s, new_net=%s" % (task[1], task[2], task[3])
334 self.edit_iface(task[1], task[2], task[3])
335 elif task[0] == 'restore-iface':
336 print self.name, ": processing task restore-iface %s mac=%s" % (task[1], task[2])
337 self.restore_iface(task[1], task[2])
338 elif task[0] == 'new-ovsbridge':
339 print self.name, ": Creating compute OVS bridge"
340 self.create_ovs_bridge()
341 break
342 elif task[0] == 'new-vxlan':
343 print self.name, ": Creating vxlan tunnel=" + task[1] + ", remote ip=" + task[2]
344 self.create_ovs_vxlan_tunnel(task[1], task[2])
345 break
346 elif task[0] == 'del-ovsbridge':
347 print self.name, ": Deleting OVS bridge"
348 self.delete_ovs_bridge()
349 break
350 elif task[0] == 'del-vxlan':
351 print self.name, ": Deleting vxlan " + task[1] + " tunnel"
352 self.delete_ovs_vxlan_tunnel(task[1])
353 break
354 elif task[0] == 'create-ovs-bridge-port':
355 print self.name, ": Adding port ovim-" + task[1] + " to OVS bridge"
356 self.create_ovs_bridge_port(task[1])
357 elif task[0] == 'del-ovs-port':
358 self.delete_bridge_port_attached_to_ovs(task[1], task[2])
359 else:
360 print self.name, ": unknown task", task
361
362 def server_forceoff(self, wait_until_finished=False):
363 while len(self.pending_terminate_server)>0:
364 now = time.time()
365 if self.pending_terminate_server[0][0]>now:
366 if wait_until_finished:
367 time.sleep(1)
368 continue
369 else:
370 return
371 req={'uuid':self.pending_terminate_server[0][1],
372 'action':{'terminate':'force'},
373 'status': None
374 }
375 self.action_on_server(req)
376 self.pending_terminate_server.pop(0)
377
378 def terminate(self):
379 try:
380 self.server_forceoff(True)
381 if self.localinfo_dirty:
382 self.save_localinfo()
383 if not self.test:
384 self.ssh_conn.close()
385 except Exception as e:
386 text = str(e)
387 print self.name, ": terminate Exception:", text
388 print self.name, ": exit from host_thread"
389
390 def get_local_iface_name(self, generic_name):
391 if self.hostinfo != None and "iface_names" in self.hostinfo and generic_name in self.hostinfo["iface_names"]:
392 return self.hostinfo["iface_names"][generic_name]
393 return generic_name
394
395 def create_xml_server(self, server, dev_list, server_metadata={}):
396 """Function that implements the generation of the VM XML definition.
397 Additional devices are in dev_list list
398 The main disk is upon dev_list[0]"""
399
400 #get if operating system is Windows
401 windows_os = False
402 os_type = server_metadata.get('os_type', None)
403 if os_type == None and 'metadata' in dev_list[0]:
404 os_type = dev_list[0]['metadata'].get('os_type', None)
405 if os_type != None and os_type.lower() == "windows":
406 windows_os = True
407 #get type of hard disk bus
408 bus_ide = True if windows_os else False
409 bus = server_metadata.get('bus', None)
410 if bus == None and 'metadata' in dev_list[0]:
411 bus = dev_list[0]['metadata'].get('bus', None)
412 if bus != None:
413 bus_ide = True if bus=='ide' else False
414
415 self.xml_level = 0
416
417 text = "<domain type='kvm'>"
418 #get topology
419 topo = server_metadata.get('topology', None)
420 if topo == None and 'metadata' in dev_list[0]:
421 topo = dev_list[0]['metadata'].get('topology', None)
422 #name
423 name = server.get('name','') + "_" + server['uuid']
424 name = name[:58] #qemu impose a length limit of 59 chars or not start. Using 58
425 text += self.inc_tab() + "<name>" + name+ "</name>"
426 #uuid
427 text += self.tab() + "<uuid>" + server['uuid'] + "</uuid>"
428
429 numa={}
430 if 'extended' in server and server['extended']!=None and 'numas' in server['extended']:
431 numa = server['extended']['numas'][0]
432 #memory
433 use_huge = False
434 memory = int(numa.get('memory',0))*1024*1024 #in KiB
435 if memory==0:
436 memory = int(server['ram'])*1024;
437 else:
438 if not self.develop_mode:
439 use_huge = True
440 if memory==0:
441 return -1, 'No memory assigned to instance'
442 memory = str(memory)
443 text += self.tab() + "<memory unit='KiB'>" +memory+"</memory>"
444 text += self.tab() + "<currentMemory unit='KiB'>" +memory+ "</currentMemory>"
445 if use_huge:
446 text += self.tab()+'<memoryBacking>'+ \
447 self.inc_tab() + '<hugepages/>'+ \
448 self.dec_tab()+ '</memoryBacking>'
449
450 #cpu
451 use_cpu_pinning=False
452 vcpus = int(server.get("vcpus",0))
453 cpu_pinning = []
454 if 'cores-source' in numa:
455 use_cpu_pinning=True
456 for index in range(0, len(numa['cores-source'])):
457 cpu_pinning.append( [ numa['cores-id'][index], numa['cores-source'][index] ] )
458 vcpus += 1
459 if 'threads-source' in numa:
460 use_cpu_pinning=True
461 for index in range(0, len(numa['threads-source'])):
462 cpu_pinning.append( [ numa['threads-id'][index], numa['threads-source'][index] ] )
463 vcpus += 1
464 if 'paired-threads-source' in numa:
465 use_cpu_pinning=True
466 for index in range(0, len(numa['paired-threads-source'])):
467 cpu_pinning.append( [numa['paired-threads-id'][index][0], numa['paired-threads-source'][index][0] ] )
468 cpu_pinning.append( [numa['paired-threads-id'][index][1], numa['paired-threads-source'][index][1] ] )
469 vcpus += 2
470
471 if use_cpu_pinning and not self.develop_mode:
472 text += self.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning)) +"</vcpu>" + \
473 self.tab()+'<cputune>'
474 self.xml_level += 1
475 for i in range(0, len(cpu_pinning)):
476 text += self.tab() + "<vcpupin vcpu='" +str(cpu_pinning[i][0])+ "' cpuset='" +str(cpu_pinning[i][1]) +"'/>"
477 text += self.dec_tab()+'</cputune>'+ \
478 self.tab() + '<numatune>' +\
479 self.inc_tab() + "<memory mode='strict' nodeset='" +str(numa['source'])+ "'/>" +\
480 self.dec_tab() + '</numatune>'
481 else:
482 if vcpus==0:
483 return -1, "Instance without number of cpus"
484 text += self.tab()+"<vcpu>" + str(vcpus) + "</vcpu>"
485
486 #boot
487 boot_cdrom = False
488 for dev in dev_list:
489 if dev['type']=='cdrom' :
490 boot_cdrom = True
491 break
492 text += self.tab()+ '<os>' + \
493 self.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
494 if boot_cdrom:
495 text += self.tab() + "<boot dev='cdrom'/>"
496 text += self.tab() + "<boot dev='hd'/>" + \
497 self.dec_tab()+'</os>'
498 #features
499 text += self.tab()+'<features>'+\
500 self.inc_tab()+'<acpi/>' +\
501 self.tab()+'<apic/>' +\
502 self.tab()+'<pae/>'+ \
503 self.dec_tab() +'</features>'
504 if windows_os or topo=="oneSocket":
505 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>"% vcpus
506 else:
507 text += self.tab() + "<cpu mode='host-model'></cpu>"
508 text += self.tab() + "<clock offset='utc'/>" +\
509 self.tab() + "<on_poweroff>preserve</on_poweroff>" + \
510 self.tab() + "<on_reboot>restart</on_reboot>" + \
511 self.tab() + "<on_crash>restart</on_crash>"
512 text += self.tab() + "<devices>" + \
513 self.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
514 self.tab() + "<serial type='pty'>" +\
515 self.inc_tab() + "<target port='0'/>" + \
516 self.dec_tab() + "</serial>" +\
517 self.tab() + "<console type='pty'>" + \
518 self.inc_tab()+ "<target type='serial' port='0'/>" + \
519 self.dec_tab()+'</console>'
520 if windows_os:
521 text += self.tab() + "<controller type='usb' index='0'/>" + \
522 self.tab() + "<controller type='ide' index='0'/>" + \
523 self.tab() + "<input type='mouse' bus='ps2'/>" + \
524 self.tab() + "<sound model='ich6'/>" + \
525 self.tab() + "<video>" + \
526 self.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
527 self.dec_tab() + "</video>" + \
528 self.tab() + "<memballoon model='virtio'/>" + \
529 self.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
530
531 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
532 #> self.dec_tab()+'</hostdev>\n' +\
533 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
534 if windows_os:
535 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
536 else:
537 #If image contains 'GRAPH' include graphics
538 #if 'GRAPH' in image:
539 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
540 self.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
541 self.dec_tab() + "</graphics>"
542
543 vd_index = 'a'
544 for dev in dev_list:
545 bus_ide_dev = bus_ide
546 if dev['type']=='cdrom' or dev['type']=='disk':
547 if dev['type']=='cdrom':
548 bus_ide_dev = True
549 text += self.tab() + "<disk type='file' device='"+dev['type']+"'>"
550 if 'file format' in dev:
551 text += self.inc_tab() + "<driver name='qemu' type='" +dev['file format']+ "' cache='writethrough'/>"
552 if 'source file' in dev:
553 text += self.tab() + "<source file='" +dev['source file']+ "'/>"
554 #elif v['type'] == 'block':
555 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
556 #else:
557 # return -1, 'Unknown disk type ' + v['type']
558 vpci = dev.get('vpci',None)
559 if vpci == None:
560 vpci = dev['metadata'].get('vpci',None)
561 text += self.pci2xml(vpci)
562
563 if bus_ide_dev:
564 text += self.tab() + "<target dev='hd" +vd_index+ "' bus='ide'/>" #TODO allows several type of disks
565 else:
566 text += self.tab() + "<target dev='vd" +vd_index+ "' bus='virtio'/>"
567 text += self.dec_tab() + '</disk>'
568 vd_index = chr(ord(vd_index)+1)
569 elif dev['type']=='xml':
570 dev_text = dev['xml']
571 if 'vpci' in dev:
572 dev_text = dev_text.replace('__vpci__', dev['vpci'])
573 if 'source file' in dev:
574 dev_text = dev_text.replace('__file__', dev['source file'])
575 if 'file format' in dev:
576 dev_text = dev_text.replace('__format__', dev['source file'])
577 if '__dev__' in dev_text:
578 dev_text = dev_text.replace('__dev__', vd_index)
579 vd_index = chr(ord(vd_index)+1)
580 text += dev_text
581 else:
582 return -1, 'Unknown device type ' + dev['type']
583
584 net_nb=0
585 bridge_interfaces = server.get('networks', [])
586 for v in bridge_interfaces:
587 #Get the brifge name
588 self.db_lock.acquire()
589 result, content = self.db.get_table(FROM='nets', SELECT=('provider',),WHERE={'uuid':v['net_id']} )
590 self.db_lock.release()
591 if result <= 0:
592 print "create_xml_server ERROR getting nets",result, content
593 return -1, content
594 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
595 #I know it is not secure
596 #for v in sorted(desc['network interfaces'].itervalues()):
597 model = v.get("model", None)
598 if content[0]['provider']=='default':
599 text += self.tab() + "<interface type='network'>" + \
600 self.inc_tab() + "<source network='" +content[0]['provider']+ "'/>"
601 elif content[0]['provider'][0:7]=='macvtap':
602 text += self.tab()+"<interface type='direct'>" + \
603 self.inc_tab() + "<source dev='" + self.get_local_iface_name(content[0]['provider'][8:]) + "' mode='bridge'/>" + \
604 self.tab() + "<target dev='macvtap0'/>"
605 if windows_os:
606 text += self.tab() + "<alias name='net" + str(net_nb) + "'/>"
607 elif model==None:
608 model = "virtio"
609 elif content[0]['provider'][0:6]=='bridge':
610 text += self.tab() + "<interface type='bridge'>" + \
611 self.inc_tab()+"<source bridge='" +self.get_local_iface_name(content[0]['provider'][7:])+ "'/>"
612 if windows_os:
613 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
614 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
615 elif model==None:
616 model = "virtio"
617 elif content[0]['provider'][0:3] == "OVS":
618 vlan = content[0]['provider'].replace('OVS:', '')
619 text += self.tab() + "<interface type='bridge'>" + \
620 self.inc_tab() + "<source bridge='ovim-" + vlan + "'/>"
621 else:
622 return -1, 'Unknown Bridge net provider ' + content[0]['provider']
623 if model!=None:
624 text += self.tab() + "<model type='" +model+ "'/>"
625 if v.get('mac_address', None) != None:
626 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
627 text += self.pci2xml(v.get('vpci',None))
628 text += self.dec_tab()+'</interface>'
629
630 net_nb += 1
631
632 interfaces = numa.get('interfaces', [])
633
634 net_nb=0
635 for v in interfaces:
636 if self.develop_mode: #map these interfaces to bridges
637 text += self.tab() + "<interface type='bridge'>" + \
638 self.inc_tab()+"<source bridge='" +self.develop_bridge_iface+ "'/>"
639 if windows_os:
640 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
641 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
642 else:
643 text += self.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
644 if v.get('mac_address', None) != None:
645 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
646 text += self.pci2xml(v.get('vpci',None))
647 text += self.dec_tab()+'</interface>'
648 continue
649
650 if v['dedicated'] == 'yes': #passthrought
651 text += self.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
652 self.inc_tab() + "<source>"
653 self.inc_tab()
654 text += self.pci2xml(v['source'])
655 text += self.dec_tab()+'</source>'
656 text += self.pci2xml(v.get('vpci',None))
657 if windows_os:
658 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
659 text += self.dec_tab()+'</hostdev>'
660 net_nb += 1
661 else: #sriov_interfaces
662 #skip not connected interfaces
663 if v.get("net_id") == None:
664 continue
665 text += self.tab() + "<interface type='hostdev' managed='yes'>"
666 self.inc_tab()
667 if v.get('mac_address', None) != None:
668 text+= self.tab() + "<mac address='" +v['mac_address']+ "'/>"
669 text+= self.tab()+'<source>'
670 self.inc_tab()
671 text += self.pci2xml(v['source'])
672 text += self.dec_tab()+'</source>'
673 if v.get('vlan',None) != None:
674 text += self.tab() + "<vlan> <tag id='" + str(v['vlan']) + "'/> </vlan>"
675 text += self.pci2xml(v.get('vpci',None))
676 if windows_os:
677 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
678 text += self.dec_tab()+'</interface>'
679
680
681 text += self.dec_tab()+'</devices>'+\
682 self.dec_tab()+'</domain>'
683 return 0, text
684
685 def pci2xml(self, pci):
686 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
687 alows an empty pci text'''
688 if pci is None:
689 return ""
690 first_part = pci.split(':')
691 second_part = first_part[2].split('.')
692 return self.tab() + "<address type='pci' domain='0x" + first_part[0] + \
693 "' bus='0x" + first_part[1] + "' slot='0x" + second_part[0] + \
694 "' function='0x" + second_part[1] + "'/>"
695
696 def tab(self):
697 """Return indentation according to xml_level"""
698 return "\n" + (' '*self.xml_level)
699
700 def inc_tab(self):
701 """Increment and return indentation according to xml_level"""
702 self.xml_level += 1
703 return self.tab()
704
705 def dec_tab(self):
706 """Decrement and return indentation according to xml_level"""
707 self.xml_level -= 1
708 return self.tab()
709
710 def create_ovs_bridge(self):
711 """
712 Create a bridge in compute OVS to allocate VMs
713 :return: True if success
714 """
715 if self.test:
716 return
717 command = 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
718 print self.name, ': command:', command
719 (_, stdout, _) = self.ssh_conn.exec_command(command)
720 content = stdout.read()
721 if len(content) == 0:
722 return True
723 else:
724 return False
725
726 def delete_port_to_ovs_bridge(self, vlan, net_uuid):
727 """
728 Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed
729 :param vlan: vlan port id
730 :param net_uuid: network id
731 :return:
732 """
733
734 command = 'sudo ovs-vsctl del-port br-int ovim-' + vlan
735 print self.name, ': command:', command
736 (_, stdout, _) = self.ssh_conn.exec_command(command)
737 content = stdout.read()
738 if len(content) == 0:
739 return True
740 else:
741 return False
742
743 def is_port_free(self, vlan, net_uuid):
744 """
745 Check if there not ovs ports of a network in a compute host.
746 :param vlan: vlan port id
747 :param net_uuid: network id
748 :return: True if is not free
749 """
750 self.db_lock.acquire()
751 result, content = self.db.get_table(
752 FROM='ports as p join instances as i on p.instance_id=i.uuid',
753 WHERE={"i.host_id": self.host_id, 'p.type': 'instance:ovs', 'p.net_id': net_uuid}
754 )
755 self.db_lock.release()
756
757 if content > 0:
758 return False
759 else:
760 return True
761
762 def add_port_to_ovs_bridge(self, vlan):
763 """
764 Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge
765 :param vlan: vlan port id
766 :return:
767 """
768 command = 'sudo ovs-vsctl add-port br-int ovim-' + vlan + ' tag=' + vlan
769 print self.name, ': command:', command
770 (_, stdout, _) = self.ssh_conn.exec_command(command)
771 content = stdout.read()
772 if len(content) == 0:
773 return True
774 else:
775 return False
776
777 def delete_bridge_port_attached_to_ovs(self, vlan, net_uuid):
778 """
779 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
780 :param vlan:
781 :param net_uuid:
782 :return: True if success
783 """
784 if self.test:
785 return
786 if not self.is_port_free(vlan, net_uuid):
787 return True
788 self.delete_port_to_ovs_bridge(vlan, net_uuid)
789 self.delete_linux_bridge(vlan)
790 return True
791
792 def delete_linux_bridge(self, vlan):
793 """
794 Delete a linux bridge in a scpecific compute.
795 :param vlan: vlan port id
796 :return: True if success
797 """
798 command = 'sudo ifconfig ovim-' + vlan + ' down && sudo brctl delbr ovim-' + vlan
799 print self.name, ': command:', command
800 (_, stdout, _) = self.ssh_conn.exec_command(command)
801 content = stdout.read()
802 if len(content) == 0:
803 return True
804 else:
805 return False
806
807 def create_ovs_bridge_port(self, vlan):
808 """
809 Generate a linux bridge and attache the port to a OVS bridge
810 :param vlan: vlan port id
811 :return:
812 """
813 if self.test:
814 return
815 self.create_linux_bridge(vlan)
816 self.add_port_to_ovs_bridge(vlan)
817
818 def create_linux_bridge(self, vlan):
819 """
820 Create a linux bridge with STP active
821 :param vlan: netowrk vlan id
822 :return:
823 """
824 command = 'sudo brctl addbr ovim-' + vlan + ' && sudo ifconfig ovim-' + vlan + ' up'
825 print self.name, ': command:', command
826 (_, stdout, _) = self.ssh_conn.exec_command(command)
827 content = stdout.read()
828
829 if len(content) != 0:
830 return False
831
832 command = 'sudo brctl stp ovim-' + vlan + ' on'
833 print self.name, ': command:', command
834 (_, stdout, _) = self.ssh_conn.exec_command(command)
835 content = stdout.read()
836
837 if len(content) == 0:
838 return True
839 else:
840 return False
841
842 def create_ovs_vxlan_tunnel(self, vxlan_interface, remote_ip):
843 """
844 Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level
845 :param vxlan_interface: vlxan inteface name.
846 :param remote_ip: tunnel endpoint remote compute ip.
847 :return:
848 """
849 if self.test:
850 return
851 command = 'sudo ovs-vsctl add-port br-int ' + vxlan_interface + \
852 ' -- set Interface ' + vxlan_interface + ' type=vxlan options:remote_ip=' + remote_ip + \
853 ' -- set Port ' + vxlan_interface + ' other_config:stp-path-cost=10'
854 print self.name, ': command:', command
855 (_, stdout, _) = self.ssh_conn.exec_command(command)
856 content = stdout.read()
857 print content
858 if len(content) == 0:
859 return True
860 else:
861 return False
862
863 def delete_ovs_vxlan_tunnel(self, vxlan_interface):
864 """
865 Delete a vlxan tunnel port from a OVS brdige.
866 :param vxlan_interface: vlxan name to be delete it.
867 :return: True if success.
868 """
869 if self.test:
870 return
871 command = 'sudo ovs-vsctl del-port br-int ' + vxlan_interface
872 print self.name, ': command:', command
873 (_, stdout, _) = self.ssh_conn.exec_command(command)
874 content = stdout.read()
875 print content
876 if len(content) == 0:
877 return True
878 else:
879 return False
880
881 def delete_ovs_bridge(self):
882 """
883 Delete a OVS bridge from a compute.
884 :return: True if success
885 """
886 if self.test:
887 return
888 command = 'sudo ovs-vsctl del-br br-int'
889 print self.name, ': command:', command
890 (_, stdout, _) = self.ssh_conn.exec_command(command)
891 content = stdout.read()
892 if len(content) == 0:
893 return True
894 else:
895 return False
896
897 def get_file_info(self, path):
898 command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
899 print self.name, ': command:', command
900 (_, stdout, _) = self.ssh_conn.exec_command(command)
901 content = stdout.read()
902 if len(content) == 0:
903 return None # file does not exist
904 else:
905 return content.split(" ") #(permission, 1, owner, group, size, date, file)
906
907 def qemu_get_info(self, path):
908 command = 'qemu-img info ' + path
909 print self.name, ': command:', command
910 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
911 content = stdout.read()
912 if len(content) == 0:
913 error = stderr.read()
914 print self.name, ": get_qemu_info error ", error
915 raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error)
916 else:
917 try:
918 return yaml.load(content)
919 except yaml.YAMLError as exc:
920 text = ""
921 if hasattr(exc, 'problem_mark'):
922 mark = exc.problem_mark
923 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
924 print self.name, ": get_qemu_info yaml format Exception", text
925 raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text)
926
927 def qemu_change_backing(self, inc_file, new_backing_file):
928 command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file
929 print self.name, ': command:', command
930 (_, _, stderr) = self.ssh_conn.exec_command(command)
931 content = stderr.read()
932 if len(content) == 0:
933 return 0
934 else:
935 print self.name, ": qemu_change_backing error: ", content
936 return -1
937
938 def get_notused_filename(self, proposed_name, suffix=''):
939 '''Look for a non existing file_name in the host
940 proposed_name: proposed file name, includes path
941 suffix: suffix to be added to the name, before the extention
942 '''
943 extension = proposed_name.rfind(".")
944 slash = proposed_name.rfind("/")
945 if extension < 0 or extension < slash: # no extension
946 extension = len(proposed_name)
947 target_name = proposed_name[:extension] + suffix + proposed_name[extension:]
948 info = self.get_file_info(target_name)
949 if info is None:
950 return target_name
951
952 index=0
953 while info is not None:
954 target_name = proposed_name[:extension] + suffix + "-" + str(index) + proposed_name[extension:]
955 index+=1
956 info = self.get_file_info(target_name)
957 return target_name
958
959 def get_notused_path(self, proposed_path, suffix=''):
960 '''Look for a non existing path at database for images
961 proposed_path: proposed file name, includes path
962 suffix: suffix to be added to the name, before the extention
963 '''
964 extension = proposed_path.rfind(".")
965 if extension < 0:
966 extension = len(proposed_path)
967 if suffix != None:
968 target_path = proposed_path[:extension] + suffix + proposed_path[extension:]
969 index=0
970 while True:
971 r,_=self.db.get_table(FROM="images",WHERE={"path":target_path})
972 if r<=0:
973 return target_path
974 target_path = proposed_path[:extension] + suffix + "-" + str(index) + proposed_path[extension:]
975 index+=1
976
977
978 def delete_file(self, file_name):
979 command = 'rm -f '+file_name
980 print self.name, ': command:', command
981 (_, _, stderr) = self.ssh_conn.exec_command(command)
982 error_msg = stderr.read()
983 if len(error_msg) > 0:
984 raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg)
985
986 def copy_file(self, source, destination, perserve_time=True):
987 if source[0:4]=="http":
988 command = "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
989 dst=destination, src=source, dst_result=destination + ".result" )
990 else:
991 command = 'cp --no-preserve=mode'
992 if perserve_time:
993 command += ' --preserve=timestamps'
994 command += " '{}' '{}'".format(source, destination)
995 print self.name, ': command:', command
996 (_, _, stderr) = self.ssh_conn.exec_command(command)
997 error_msg = stderr.read()
998 if len(error_msg) > 0:
999 raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg)
1000
1001 def copy_remote_file(self, remote_file, use_incremental):
1002 ''' Copy a file from the repository to local folder and recursively
1003 copy the backing files in case the remote file is incremental
1004 Read and/or modified self.localinfo['files'] that contain the
1005 unmodified copies of images in the local path
1006 params:
1007 remote_file: path of remote file
1008 use_incremental: None (leave the decision to this function), True, False
1009 return:
1010 local_file: name of local file
1011 qemu_info: dict with quemu information of local file
1012 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
1013 '''
1014
1015 use_incremental_out = use_incremental
1016 new_backing_file = None
1017 local_file = None
1018 file_from_local = True
1019
1020 #in case incremental use is not decided, take the decision depending on the image
1021 #avoid the use of incremental if this image is already incremental
1022 if remote_file[0:4] == "http":
1023 file_from_local = False
1024 if file_from_local:
1025 qemu_remote_info = self.qemu_get_info(remote_file)
1026 if use_incremental_out==None:
1027 use_incremental_out = not ( file_from_local and 'backing file' in qemu_remote_info)
1028 #copy recursivelly the backing files
1029 if file_from_local and 'backing file' in qemu_remote_info:
1030 new_backing_file, _, _ = self.copy_remote_file(qemu_remote_info['backing file'], True)
1031
1032 #check if remote file is present locally
1033 if use_incremental_out and remote_file in self.localinfo['files']:
1034 local_file = self.localinfo['files'][remote_file]
1035 local_file_info = self.get_file_info(local_file)
1036 if file_from_local:
1037 remote_file_info = self.get_file_info(remote_file)
1038 if local_file_info == None:
1039 local_file = None
1040 elif file_from_local and (local_file_info[4]!=remote_file_info[4] or local_file_info[5]!=remote_file_info[5]):
1041 #local copy of file not valid because date or size are different.
1042 #TODO DELETE local file if this file is not used by any active virtual machine
1043 try:
1044 self.delete_file(local_file)
1045 del self.localinfo['files'][remote_file]
1046 except Exception:
1047 pass
1048 local_file = None
1049 else: #check that the local file has the same backing file, or there are not backing at all
1050 qemu_info = self.qemu_get_info(local_file)
1051 if new_backing_file != qemu_info.get('backing file'):
1052 local_file = None
1053
1054
1055 if local_file == None: #copy the file
1056 img_name= remote_file.split('/') [-1]
1057 img_local = self.image_path + '/' + img_name
1058 local_file = self.get_notused_filename(img_local)
1059 self.copy_file(remote_file, local_file, use_incremental_out)
1060
1061 if use_incremental_out:
1062 self.localinfo['files'][remote_file] = local_file
1063 if new_backing_file:
1064 self.qemu_change_backing(local_file, new_backing_file)
1065 qemu_info = self.qemu_get_info(local_file)
1066
1067 return local_file, qemu_info, use_incremental_out
1068
1069 def launch_server(self, conn, server, rebuild=False, domain=None):
1070 if self.test:
1071 time.sleep(random.randint(20,150)) #sleep random timeto be make it a bit more real
1072 return 0, 'Success'
1073
1074 server_id = server['uuid']
1075 paused = server.get('paused','no')
1076 try:
1077 if domain!=None and rebuild==False:
1078 domain.resume()
1079 #self.server_status[server_id] = 'ACTIVE'
1080 return 0, 'Success'
1081
1082 self.db_lock.acquire()
1083 result, server_data = self.db.get_instance(server_id)
1084 self.db_lock.release()
1085 if result <= 0:
1086 print self.name, ": launch_server ERROR getting server from DB",result, server_data
1087 return result, server_data
1088
1089 #0: get image metadata
1090 server_metadata = server.get('metadata', {})
1091 use_incremental = None
1092
1093 if "use_incremental" in server_metadata:
1094 use_incremental = False if server_metadata["use_incremental"]=="no" else True
1095
1096 server_host_files = self.localinfo['server_files'].get( server['uuid'], {})
1097 if rebuild:
1098 #delete previous incremental files
1099 for file_ in server_host_files.values():
1100 self.delete_file(file_['source file'] )
1101 server_host_files={}
1102
1103 #1: obtain aditional devices (disks)
1104 #Put as first device the main disk
1105 devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ]
1106 if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']:
1107 devices += server_data['extended']['devices']
1108
1109 for dev in devices:
1110 if dev['image_id'] == None:
1111 continue
1112
1113 self.db_lock.acquire()
1114 result, content = self.db.get_table(FROM='images', SELECT=('path','metadata'),WHERE={'uuid':dev['image_id']} )
1115 self.db_lock.release()
1116 if result <= 0:
1117 error_text = "ERROR", result, content, "when getting image", dev['image_id']
1118 print self.name, ": launch_server", error_text
1119 return -1, error_text
1120 if content[0]['metadata'] is not None:
1121 dev['metadata'] = json.loads(content[0]['metadata'])
1122 else:
1123 dev['metadata'] = {}
1124
1125 if dev['image_id'] in server_host_files:
1126 dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path
1127 dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2
1128 continue
1129
1130 #2: copy image to host
1131 remote_file = content[0]['path']
1132 use_incremental_image = use_incremental
1133 if dev['metadata'].get("use_incremental") == "no":
1134 use_incremental_image = False
1135 local_file, qemu_info, use_incremental_image = self.copy_remote_file(remote_file, use_incremental_image)
1136
1137 #create incremental image
1138 if use_incremental_image:
1139 local_file_inc = self.get_notused_filename(local_file, '.inc')
1140 command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file
1141 print 'command:', command
1142 (_, _, stderr) = self.ssh_conn.exec_command(command)
1143 error_msg = stderr.read()
1144 if len(error_msg) > 0:
1145 raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg)
1146 local_file = local_file_inc
1147 qemu_info = {'file format':'qcow2'}
1148
1149 server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']}
1150
1151 dev['source file'] = local_file
1152 dev['file format'] = qemu_info['file format']
1153
1154 self.localinfo['server_files'][ server['uuid'] ] = server_host_files
1155 self.localinfo_dirty = True
1156
1157 #3 Create XML
1158 result, xml = self.create_xml_server(server_data, devices, server_metadata) #local_file
1159 if result <0:
1160 print self.name, ": create xml server error:", xml
1161 return -2, xml
1162 print self.name, ": create xml:", xml
1163 atribute = host_thread.lvirt_module.VIR_DOMAIN_START_PAUSED if paused == "yes" else 0
1164 #4 Start the domain
1165 if not rebuild: #ensures that any pending destroying server is done
1166 self.server_forceoff(True)
1167 #print self.name, ": launching instance" #, xml
1168 conn.createXML(xml, atribute)
1169 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
1170
1171 return 0, 'Success'
1172
1173 except paramiko.ssh_exception.SSHException as e:
1174 text = e.args[0]
1175 print self.name, ": launch_server(%s) ssh Exception: %s" %(server_id, text)
1176 if "SSH session not active" in text:
1177 self.ssh_connect()
1178 except host_thread.lvirt_module.libvirtError as e:
1179 text = e.get_error_message()
1180 print self.name, ": launch_server(%s) libvirt Exception: %s" %(server_id, text)
1181 except Exception as e:
1182 text = str(e)
1183 print self.name, ": launch_server(%s) Exception: %s" %(server_id, text)
1184 return -1, text
1185
1186 def update_servers_status(self):
1187 # # virDomainState
1188 # VIR_DOMAIN_NOSTATE = 0
1189 # VIR_DOMAIN_RUNNING = 1
1190 # VIR_DOMAIN_BLOCKED = 2
1191 # VIR_DOMAIN_PAUSED = 3
1192 # VIR_DOMAIN_SHUTDOWN = 4
1193 # VIR_DOMAIN_SHUTOFF = 5
1194 # VIR_DOMAIN_CRASHED = 6
1195 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
1196
1197 if self.test or len(self.server_status)==0:
1198 return
1199
1200 try:
1201 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1202 domains= conn.listAllDomains()
1203 domain_dict={}
1204 for domain in domains:
1205 uuid = domain.UUIDString() ;
1206 libvirt_status = domain.state()
1207 #print libvirt_status
1208 if libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_RUNNING or libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTDOWN:
1209 new_status = "ACTIVE"
1210 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_PAUSED:
1211 new_status = "PAUSED"
1212 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTOFF:
1213 new_status = "INACTIVE"
1214 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_CRASHED:
1215 new_status = "ERROR"
1216 else:
1217 new_status = None
1218 domain_dict[uuid] = new_status
1219 conn.close()
1220 except host_thread.lvirt_module.libvirtError as e:
1221 print self.name, ": get_state() Exception '", e.get_error_message()
1222 return
1223
1224 for server_id, current_status in self.server_status.iteritems():
1225 new_status = None
1226 if server_id in domain_dict:
1227 new_status = domain_dict[server_id]
1228 else:
1229 new_status = "INACTIVE"
1230
1231 if new_status == None or new_status == current_status:
1232 continue
1233 if new_status == 'INACTIVE' and current_status == 'ERROR':
1234 continue #keep ERROR status, because obviously this machine is not running
1235 #change status
1236 print self.name, ": server ", server_id, "status change from ", current_status, "to", new_status
1237 STATUS={'progress':100, 'status':new_status}
1238 if new_status == 'ERROR':
1239 STATUS['last_error'] = 'machine has crashed'
1240 self.db_lock.acquire()
1241 r,_ = self.db.update_rows('instances', STATUS, {'uuid':server_id}, log=False)
1242 self.db_lock.release()
1243 if r>=0:
1244 self.server_status[server_id] = new_status
1245
1246 def action_on_server(self, req, last_retry=True):
1247 '''Perform an action on a req
1248 Attributes:
1249 req: dictionary that contain:
1250 server properties: 'uuid','name','tenant_id','status'
1251 action: 'action'
1252 host properties: 'user', 'ip_name'
1253 return (error, text)
1254 0: No error. VM is updated to new state,
1255 -1: Invalid action, as trying to pause a PAUSED VM
1256 -2: Error accessing host
1257 -3: VM nor present
1258 -4: Error at DB access
1259 -5: Error while trying to perform action. VM is updated to ERROR
1260 '''
1261 server_id = req['uuid']
1262 conn = None
1263 new_status = None
1264 old_status = req['status']
1265 last_error = None
1266
1267 if self.test:
1268 if 'terminate' in req['action']:
1269 new_status = 'deleted'
1270 elif 'shutoff' in req['action'] or 'shutdown' in req['action'] or 'forceOff' in req['action']:
1271 if req['status']!='ERROR':
1272 time.sleep(5)
1273 new_status = 'INACTIVE'
1274 elif 'start' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1275 elif 'resume' in req['action'] and req['status']!='ERROR' and req['status']!='INACTIVE' : new_status = 'ACTIVE'
1276 elif 'pause' in req['action'] and req['status']!='ERROR': new_status = 'PAUSED'
1277 elif 'reboot' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1278 elif 'rebuild' in req['action']:
1279 time.sleep(random.randint(20,150))
1280 new_status = 'ACTIVE'
1281 elif 'createImage' in req['action']:
1282 time.sleep(5)
1283 self.create_image(None, req)
1284 else:
1285 try:
1286 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1287 try:
1288 dom = conn.lookupByUUIDString(server_id)
1289 except host_thread.lvirt_module.libvirtError as e:
1290 text = e.get_error_message()
1291 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1292 dom = None
1293 else:
1294 print self.name, ": action_on_server(",server_id,") libvirt exception:", text
1295 raise e
1296
1297 if 'forceOff' in req['action']:
1298 if dom == None:
1299 print self.name, ": action_on_server(",server_id,") domain not running"
1300 else:
1301 try:
1302 print self.name, ": sending DESTROY to server", server_id
1303 dom.destroy()
1304 except Exception as e:
1305 if "domain is not running" not in e.get_error_message():
1306 print self.name, ": action_on_server(",server_id,") Exception while sending force off:", e.get_error_message()
1307 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1308 new_status = 'ERROR'
1309
1310 elif 'terminate' in req['action']:
1311 if dom == None:
1312 print self.name, ": action_on_server(",server_id,") domain not running"
1313 new_status = 'deleted'
1314 else:
1315 try:
1316 if req['action']['terminate'] == 'force':
1317 print self.name, ": sending DESTROY to server", server_id
1318 dom.destroy()
1319 new_status = 'deleted'
1320 else:
1321 print self.name, ": sending SHUTDOWN to server", server_id
1322 dom.shutdown()
1323 self.pending_terminate_server.append( (time.time()+10,server_id) )
1324 except Exception as e:
1325 print self.name, ": action_on_server(",server_id,") Exception while destroy:", e.get_error_message()
1326 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1327 new_status = 'ERROR'
1328 if "domain is not running" in e.get_error_message():
1329 try:
1330 dom.undefine()
1331 new_status = 'deleted'
1332 except Exception:
1333 print self.name, ": action_on_server(",server_id,") Exception while undefine:", e.get_error_message()
1334 last_error = 'action_on_server Exception2 while undefine:', e.get_error_message()
1335 #Exception: 'virDomainDetachDevice() failed'
1336 if new_status=='deleted':
1337 if server_id in self.server_status:
1338 del self.server_status[server_id]
1339 if req['uuid'] in self.localinfo['server_files']:
1340 for file_ in self.localinfo['server_files'][ req['uuid'] ].values():
1341 try:
1342 self.delete_file(file_['source file'])
1343 except Exception:
1344 pass
1345 del self.localinfo['server_files'][ req['uuid'] ]
1346 self.localinfo_dirty = True
1347
1348 elif 'shutoff' in req['action'] or 'shutdown' in req['action']:
1349 try:
1350 if dom == None:
1351 print self.name, ": action_on_server(",server_id,") domain not running"
1352 else:
1353 dom.shutdown()
1354 # new_status = 'INACTIVE'
1355 #TODO: check status for changing at database
1356 except Exception as e:
1357 new_status = 'ERROR'
1358 print self.name, ": action_on_server(",server_id,") Exception while shutdown:", e.get_error_message()
1359 last_error = 'action_on_server Exception while shutdown: ' + e.get_error_message()
1360
1361 elif 'rebuild' in req['action']:
1362 if dom != None:
1363 dom.destroy()
1364 r = self.launch_server(conn, req, True, None)
1365 if r[0] <0:
1366 new_status = 'ERROR'
1367 last_error = r[1]
1368 else:
1369 new_status = 'ACTIVE'
1370 elif 'start' in req['action']:
1371 # The instance is only create in DB but not yet at libvirt domain, needs to be create
1372 rebuild = True if req['action']['start'] == 'rebuild' else False
1373 r = self.launch_server(conn, req, rebuild, dom)
1374 if r[0] <0:
1375 new_status = 'ERROR'
1376 last_error = r[1]
1377 else:
1378 new_status = 'ACTIVE'
1379
1380 elif 'resume' in req['action']:
1381 try:
1382 if dom == None:
1383 pass
1384 else:
1385 dom.resume()
1386 # new_status = 'ACTIVE'
1387 except Exception as e:
1388 print self.name, ": action_on_server(",server_id,") Exception while resume:", e.get_error_message()
1389
1390 elif 'pause' in req['action']:
1391 try:
1392 if dom == None:
1393 pass
1394 else:
1395 dom.suspend()
1396 # new_status = 'PAUSED'
1397 except Exception as e:
1398 print self.name, ": action_on_server(",server_id,") Exception while pause:", e.get_error_message()
1399
1400 elif 'reboot' in req['action']:
1401 try:
1402 if dom == None:
1403 pass
1404 else:
1405 dom.reboot()
1406 print self.name, ": action_on_server(",server_id,") reboot:"
1407 #new_status = 'ACTIVE'
1408 except Exception as e:
1409 print self.name, ": action_on_server(",server_id,") Exception while reboot:", e.get_error_message()
1410 elif 'createImage' in req['action']:
1411 self.create_image(dom, req)
1412
1413
1414 conn.close()
1415 except host_thread.lvirt_module.libvirtError as e:
1416 if conn is not None: conn.close()
1417 text = e.get_error_message()
1418 new_status = "ERROR"
1419 last_error = text
1420 print self.name, ": action_on_server(",server_id,") Exception '", text
1421 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1422 print self.name, ": action_on_server(",server_id,") Exception removed from host"
1423 #end of if self.test
1424 if new_status == None:
1425 return 1
1426
1427 print self.name, ": action_on_server(",server_id,") new status", new_status, last_error
1428 UPDATE = {'progress':100, 'status':new_status}
1429
1430 if new_status=='ERROR':
1431 if not last_retry: #if there will be another retry do not update database
1432 return -1
1433 elif 'terminate' in req['action']:
1434 #PUT a log in the database
1435 print self.name, ": PANIC deleting server", server_id, last_error
1436 self.db_lock.acquire()
1437 self.db.new_row('logs',
1438 {'uuid':server_id, 'tenant_id':req['tenant_id'], 'related':'instances','level':'panic',
1439 'description':'PANIC deleting server from host '+self.name+': '+last_error}
1440 )
1441 self.db_lock.release()
1442 if server_id in self.server_status:
1443 del self.server_status[server_id]
1444 return -1
1445 else:
1446 UPDATE['last_error'] = last_error
1447 if new_status != 'deleted' and (new_status != old_status or new_status == 'ERROR') :
1448 self.db_lock.acquire()
1449 self.db.update_rows('instances', UPDATE, {'uuid':server_id}, log=True)
1450 self.server_status[server_id] = new_status
1451 self.db_lock.release()
1452 if new_status == 'ERROR':
1453 return -1
1454 return 1
1455
1456
1457 def restore_iface(self, name, mac, lib_conn=None):
1458 ''' make an ifdown, ifup to restore default parameter of na interface
1459 Params:
1460 mac: mac address of the interface
1461 lib_conn: connection to the libvirt, if None a new connection is created
1462 Return 0,None if ok, -1,text if fails
1463 '''
1464 conn=None
1465 ret = 0
1466 error_text=None
1467 if self.test:
1468 print self.name, ": restore_iface '%s' %s" % (name, mac)
1469 return 0, None
1470 try:
1471 if not lib_conn:
1472 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1473 else:
1474 conn = lib_conn
1475
1476 #wait to the pending VM deletion
1477 #TODO.Revise self.server_forceoff(True)
1478
1479 iface = conn.interfaceLookupByMACString(mac)
1480 iface.destroy()
1481 iface.create()
1482 print self.name, ": restore_iface '%s' %s" % (name, mac)
1483 except host_thread.lvirt_module.libvirtError as e:
1484 error_text = e.get_error_message()
1485 print self.name, ": restore_iface '%s' '%s' libvirt exception: %s" %(name, mac, error_text)
1486 ret=-1
1487 finally:
1488 if lib_conn is None and conn is not None:
1489 conn.close()
1490 return ret, error_text
1491
1492
1493 def create_image(self,dom, req):
1494 if self.test:
1495 if 'path' in req['action']['createImage']:
1496 file_dst = req['action']['createImage']['path']
1497 else:
1498 createImage=req['action']['createImage']
1499 img_name= createImage['source']['path']
1500 index=img_name.rfind('/')
1501 file_dst = self.get_notused_path(img_name[:index+1] + createImage['name'] + '.qcow2')
1502 image_status='ACTIVE'
1503 else:
1504 for retry in (0,1):
1505 try:
1506 server_id = req['uuid']
1507 createImage=req['action']['createImage']
1508 file_orig = self.localinfo['server_files'][server_id] [ createImage['source']['image_id'] ] ['source file']
1509 if 'path' in req['action']['createImage']:
1510 file_dst = req['action']['createImage']['path']
1511 else:
1512 img_name= createImage['source']['path']
1513 index=img_name.rfind('/')
1514 file_dst = self.get_notused_filename(img_name[:index+1] + createImage['name'] + '.qcow2')
1515
1516 self.copy_file(file_orig, file_dst)
1517 qemu_info = self.qemu_get_info(file_orig)
1518 if 'backing file' in qemu_info:
1519 for k,v in self.localinfo['files'].items():
1520 if v==qemu_info['backing file']:
1521 self.qemu_change_backing(file_dst, k)
1522 break
1523 image_status='ACTIVE'
1524 break
1525 except paramiko.ssh_exception.SSHException as e:
1526 image_status='ERROR'
1527 error_text = e.args[0]
1528 print self.name, "': create_image(",server_id,") ssh Exception:", error_text
1529 if "SSH session not active" in error_text and retry==0:
1530 self.ssh_connect()
1531 except Exception as e:
1532 image_status='ERROR'
1533 error_text = str(e)
1534 print self.name, "': create_image(",server_id,") Exception:", error_text
1535
1536 #TODO insert a last_error at database
1537 self.db_lock.acquire()
1538 self.db.update_rows('images', {'status':image_status, 'progress': 100, 'path':file_dst},
1539 {'uuid':req['new_image']['uuid']}, log=True)
1540 self.db_lock.release()
1541
1542 def edit_iface(self, port_id, old_net, new_net):
1543 #This action imply remove and insert interface to put proper parameters
1544 if self.test:
1545 time.sleep(1)
1546 else:
1547 #get iface details
1548 self.db_lock.acquire()
1549 r,c = self.db.get_table(FROM='ports as p join resources_port as rp on p.uuid=rp.port_id',
1550 WHERE={'port_id': port_id})
1551 self.db_lock.release()
1552 if r<0:
1553 print self.name, ": edit_iface(",port_id,") DDBB error:", c
1554 return
1555 elif r==0:
1556 print self.name, ": edit_iface(",port_id,") por not found"
1557 return
1558 port=c[0]
1559 if port["model"]!="VF":
1560 print self.name, ": edit_iface(",port_id,") ERROR model must be VF"
1561 return
1562 #create xml detach file
1563 xml=[]
1564 self.xml_level = 2
1565 xml.append("<interface type='hostdev' managed='yes'>")
1566 xml.append(" <mac address='" +port['mac']+ "'/>")
1567 xml.append(" <source>"+ self.pci2xml(port['pci'])+"\n </source>")
1568 xml.append('</interface>')
1569
1570
1571 try:
1572 conn=None
1573 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1574 dom = conn.lookupByUUIDString(port["instance_id"])
1575 if old_net:
1576 text="\n".join(xml)
1577 print self.name, ": edit_iface detaching SRIOV interface", text
1578 dom.detachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
1579 if new_net:
1580 xml[-1] =" <vlan> <tag id='" + str(port['vlan']) + "'/> </vlan>"
1581 self.xml_level = 1
1582 xml.append(self.pci2xml(port.get('vpci',None)) )
1583 xml.append('</interface>')
1584 text="\n".join(xml)
1585 print self.name, ": edit_iface attaching SRIOV interface", text
1586 dom.attachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
1587
1588 except host_thread.lvirt_module.libvirtError as e:
1589 text = e.get_error_message()
1590 print self.name, ": edit_iface(",port["instance_id"],") libvirt exception:", text
1591
1592 finally:
1593 if conn is not None: conn.close()
1594
1595
1596 def create_server(server, db, db_lock, only_of_ports):
1597 #print "server"
1598 #print "server"
1599 #print server
1600 #print "server"
1601 #print "server"
1602 #try:
1603 # host_id = server.get('host_id', None)
1604 extended = server.get('extended', None)
1605
1606 # print '----------------------'
1607 # print json.dumps(extended, indent=4)
1608
1609 requirements={}
1610 requirements['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
1611 requirements['ram'] = server['flavor'].get('ram', 0)
1612 if requirements['ram']== None:
1613 requirements['ram'] = 0
1614 requirements['vcpus'] = server['flavor'].get('vcpus', 0)
1615 if requirements['vcpus']== None:
1616 requirements['vcpus'] = 0
1617 #If extended is not defined get requirements from flavor
1618 if extended is None:
1619 #If extended is defined in flavor convert to dictionary and use it
1620 if 'extended' in server['flavor'] and server['flavor']['extended'] != None:
1621 json_acceptable_string = server['flavor']['extended'].replace("'", "\"")
1622 extended = json.loads(json_acceptable_string)
1623 else:
1624 extended = None
1625 #print json.dumps(extended, indent=4)
1626
1627 #For simplicity only one numa VM are supported in the initial implementation
1628 if extended != None:
1629 numas = extended.get('numas', [])
1630 if len(numas)>1:
1631 return (-2, "Multi-NUMA VMs are not supported yet")
1632 #elif len(numas)<1:
1633 # return (-1, "At least one numa must be specified")
1634
1635 #a for loop is used in order to be ready to multi-NUMA VMs
1636 request = []
1637 for numa in numas:
1638 numa_req = {}
1639 numa_req['memory'] = numa.get('memory', 0)
1640 if 'cores' in numa:
1641 numa_req['proc_req_nb'] = numa['cores'] #number of cores or threads to be reserved
1642 numa_req['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
1643 numa_req['proc_req_list'] = numa.get('cores-id', None) #list of ids to be assigned to the cores or threads
1644 elif 'paired-threads' in numa:
1645 numa_req['proc_req_nb'] = numa['paired-threads']
1646 numa_req['proc_req_type'] = 'paired-threads'
1647 numa_req['proc_req_list'] = numa.get('paired-threads-id', None)
1648 elif 'threads' in numa:
1649 numa_req['proc_req_nb'] = numa['threads']
1650 numa_req['proc_req_type'] = 'threads'
1651 numa_req['proc_req_list'] = numa.get('threads-id', None)
1652 else:
1653 numa_req['proc_req_nb'] = 0 # by default
1654 numa_req['proc_req_type'] = 'threads'
1655
1656
1657
1658 #Generate a list of sriov and another for physical interfaces
1659 interfaces = numa.get('interfaces', [])
1660 sriov_list = []
1661 port_list = []
1662 for iface in interfaces:
1663 iface['bandwidth'] = int(iface['bandwidth'])
1664 if iface['dedicated'][:3]=='yes':
1665 port_list.append(iface)
1666 else:
1667 sriov_list.append(iface)
1668
1669 #Save lists ordered from more restrictive to less bw requirements
1670 numa_req['sriov_list'] = sorted(sriov_list, key=lambda k: k['bandwidth'], reverse=True)
1671 numa_req['port_list'] = sorted(port_list, key=lambda k: k['bandwidth'], reverse=True)
1672
1673
1674 request.append(numa_req)
1675
1676 # print "----------\n"+json.dumps(request[0], indent=4)
1677 # print '----------\n\n'
1678
1679 #Search in db for an appropriate numa for each requested numa
1680 #at the moment multi-NUMA VMs are not supported
1681 if len(request)>0:
1682 requirements['numa'].update(request[0])
1683 if requirements['numa']['memory']>0:
1684 requirements['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
1685 elif requirements['ram']==0:
1686 return (-1, "Memory information not set neither at extended field not at ram")
1687 if requirements['numa']['proc_req_nb']>0:
1688 requirements['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
1689 elif requirements['vcpus']==0:
1690 return (-1, "Processor information not set neither at extended field not at vcpus")
1691
1692
1693 db_lock.acquire()
1694 result, content = db.get_numas(requirements, server.get('host_id', None), only_of_ports)
1695 db_lock.release()
1696
1697 if result == -1:
1698 return (-1, content)
1699
1700 numa_id = content['numa_id']
1701 host_id = content['host_id']
1702
1703 #obtain threads_id and calculate pinning
1704 cpu_pinning = []
1705 reserved_threads=[]
1706 if requirements['numa']['proc_req_nb']>0:
1707 db_lock.acquire()
1708 result, content = db.get_table(FROM='resources_core',
1709 SELECT=('id','core_id','thread_id'),
1710 WHERE={'numa_id':numa_id,'instance_id': None, 'status':'ok'} )
1711 db_lock.release()
1712 if result <= 0:
1713 print content
1714 return -1, content
1715
1716 #convert rows to a dictionary indexed by core_id
1717 cores_dict = {}
1718 for row in content:
1719 if not row['core_id'] in cores_dict:
1720 cores_dict[row['core_id']] = []
1721 cores_dict[row['core_id']].append([row['thread_id'],row['id']])
1722
1723 #In case full cores are requested
1724 paired = 'N'
1725 if requirements['numa']['proc_req_type'] == 'cores':
1726 #Get/create the list of the vcpu_ids
1727 vcpu_id_list = requirements['numa']['proc_req_list']
1728 if vcpu_id_list == None:
1729 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
1730
1731 for threads in cores_dict.itervalues():
1732 #we need full cores
1733 if len(threads) != 2:
1734 continue
1735
1736 #set pinning for the first thread
1737 cpu_pinning.append( [ vcpu_id_list.pop(0), threads[0][0], threads[0][1] ] )
1738
1739 #reserve so it is not used the second thread
1740 reserved_threads.append(threads[1][1])
1741
1742 if len(vcpu_id_list) == 0:
1743 break
1744
1745 #In case paired threads are requested
1746 elif requirements['numa']['proc_req_type'] == 'paired-threads':
1747 paired = 'Y'
1748 #Get/create the list of the vcpu_ids
1749 if requirements['numa']['proc_req_list'] != None:
1750 vcpu_id_list = []
1751 for pair in requirements['numa']['proc_req_list']:
1752 if len(pair)!=2:
1753 return -1, "Field paired-threads-id not properly specified"
1754 return
1755 vcpu_id_list.append(pair[0])
1756 vcpu_id_list.append(pair[1])
1757 else:
1758 vcpu_id_list = range(0,2*int(requirements['numa']['proc_req_nb']))
1759
1760 for threads in cores_dict.itervalues():
1761 #we need full cores
1762 if len(threads) != 2:
1763 continue
1764 #set pinning for the first thread
1765 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
1766
1767 #set pinning for the second thread
1768 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
1769
1770 if len(vcpu_id_list) == 0:
1771 break
1772
1773 #In case normal threads are requested
1774 elif requirements['numa']['proc_req_type'] == 'threads':
1775 #Get/create the list of the vcpu_ids
1776 vcpu_id_list = requirements['numa']['proc_req_list']
1777 if vcpu_id_list == None:
1778 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
1779
1780 for threads_index in sorted(cores_dict, key=lambda k: len(cores_dict[k])):
1781 threads = cores_dict[threads_index]
1782 #set pinning for the first thread
1783 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
1784
1785 #if exists, set pinning for the second thread
1786 if len(threads) == 2 and len(vcpu_id_list) != 0:
1787 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
1788
1789 if len(vcpu_id_list) == 0:
1790 break
1791
1792 #Get the source pci addresses for the selected numa
1793 used_sriov_ports = []
1794 for port in requirements['numa']['sriov_list']:
1795 db_lock.acquire()
1796 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
1797 db_lock.release()
1798 if result <= 0:
1799 print content
1800 return -1, content
1801 for row in content:
1802 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
1803 continue
1804 port['pci'] = row['pci']
1805 if 'mac_address' not in port:
1806 port['mac_address'] = row['mac']
1807 del port['mac']
1808 port['port_id']=row['id']
1809 port['Mbps_used'] = port['bandwidth']
1810 used_sriov_ports.append(row['id'])
1811 break
1812
1813 for port in requirements['numa']['port_list']:
1814 port['Mbps_used'] = None
1815 if port['dedicated'] != "yes:sriov":
1816 port['mac_address'] = port['mac']
1817 del port['mac']
1818 continue
1819 db_lock.acquire()
1820 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac', 'Mbps'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
1821 db_lock.release()
1822 if result <= 0:
1823 print content
1824 return -1, content
1825 port['Mbps_used'] = content[0]['Mbps']
1826 for row in content:
1827 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
1828 continue
1829 port['pci'] = row['pci']
1830 if 'mac_address' not in port:
1831 port['mac_address'] = row['mac'] # mac cannot be set to passthrough ports
1832 del port['mac']
1833 port['port_id']=row['id']
1834 used_sriov_ports.append(row['id'])
1835 break
1836
1837 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
1838 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
1839
1840 server['host_id'] = host_id
1841
1842
1843 #Generate dictionary for saving in db the instance resources
1844 resources = {}
1845 resources['bridged-ifaces'] = []
1846
1847 numa_dict = {}
1848 numa_dict['interfaces'] = []
1849
1850 numa_dict['interfaces'] += requirements['numa']['port_list']
1851 numa_dict['interfaces'] += requirements['numa']['sriov_list']
1852
1853 #Check bridge information
1854 unified_dataplane_iface=[]
1855 unified_dataplane_iface += requirements['numa']['port_list']
1856 unified_dataplane_iface += requirements['numa']['sriov_list']
1857
1858 for control_iface in server.get('networks', []):
1859 control_iface['net_id']=control_iface.pop('uuid')
1860 #Get the brifge name
1861 db_lock.acquire()
1862 result, content = db.get_table(FROM = 'nets',
1863 SELECT = ('name','type', 'vlan', 'provider'),
1864 WHERE = {'uuid':control_iface['net_id']})
1865 db_lock.release()
1866 if result < 0:
1867 pass
1868 elif result==0:
1869 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface['net_id']
1870 else:
1871 network=content[0]
1872 if control_iface.get("type", 'virtual') == 'virtual':
1873 if network['type']!='bridge_data' and network['type']!='bridge_man':
1874 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface['net_id']
1875 resources['bridged-ifaces'].append(control_iface)
1876 if network.get("provider") and network["provider"][0:3] == "OVS":
1877 control_iface["type"] = "instance:ovs"
1878 else:
1879 control_iface["type"] = "instance:bridge"
1880 if network.get("vlan"):
1881 control_iface["vlan"] = network["vlan"]
1882 else:
1883 if network['type']!='data' and network['type']!='ptp':
1884 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface['net_id']
1885 #dataplane interface, look for it in the numa tree and asign this network
1886 iface_found=False
1887 for dataplane_iface in numa_dict['interfaces']:
1888 if dataplane_iface['name'] == control_iface.get("name"):
1889 if (dataplane_iface['dedicated'] == "yes" and control_iface["type"] != "PF") or \
1890 (dataplane_iface['dedicated'] == "no" and control_iface["type"] != "VF") or \
1891 (dataplane_iface['dedicated'] == "yes:sriov" and control_iface["type"] != "VFnotShared") :
1892 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
1893 (control_iface.get("name"), dataplane_iface['dedicated'], control_iface["type"])
1894 dataplane_iface['uuid'] = control_iface['net_id']
1895 if dataplane_iface['dedicated'] == "no":
1896 dataplane_iface['vlan'] = network['vlan']
1897 if dataplane_iface['dedicated'] != "yes" and control_iface.get("mac_address"):
1898 dataplane_iface['mac_address'] = control_iface.get("mac_address")
1899 if control_iface.get("vpci"):
1900 dataplane_iface['vpci'] = control_iface.get("vpci")
1901 iface_found=True
1902 break
1903 if not iface_found:
1904 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface.get("name")
1905
1906 resources['host_id'] = host_id
1907 resources['image_id'] = server['image_id']
1908 resources['flavor_id'] = server['flavor_id']
1909 resources['tenant_id'] = server['tenant_id']
1910 resources['ram'] = requirements['ram']
1911 resources['vcpus'] = requirements['vcpus']
1912 resources['status'] = 'CREATING'
1913
1914 if 'description' in server: resources['description'] = server['description']
1915 if 'name' in server: resources['name'] = server['name']
1916
1917 resources['extended'] = {} #optional
1918 resources['extended']['numas'] = []
1919 numa_dict['numa_id'] = numa_id
1920 numa_dict['memory'] = requirements['numa']['memory']
1921 numa_dict['cores'] = []
1922
1923 for core in cpu_pinning:
1924 numa_dict['cores'].append({'id': core[2], 'vthread': core[0], 'paired': paired})
1925 for core in reserved_threads:
1926 numa_dict['cores'].append({'id': core})
1927 resources['extended']['numas'].append(numa_dict)
1928 if extended!=None and 'devices' in extended: #TODO allow extra devices without numa
1929 resources['extended']['devices'] = extended['devices']
1930
1931
1932 print '===================================={'
1933 print json.dumps(resources, indent=4)
1934 print '====================================}'
1935
1936 return 0, resources
1937