Merge "Fixed bug in openvim when connecting using ssh key file" into v1.0
[osm/openvim.git] / host_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 '''
28 __author__="Pablo Montes, Alfonso Tierno"
29 __date__ ="$10-jul-2014 12:07:15$"
30
31
32 import json
33 import yaml
34 import threading
35 import time
36 import Queue
37 import paramiko
38 from jsonschema import validate as js_v, exceptions as js_e
39 import libvirt
40 from vim_schema import localinfo_schema, hostinfo_schema
41 import random
42 #from logging import Logger
43 #import auxiliary_functions as af
44
45 #TODO: insert a logging system
46
47 class host_thread(threading.Thread):
48 def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode, develop_bridge_iface):
49 '''Init a thread.
50 Arguments:
51 'id' number of thead
52 'name' name of thread
53 'host','user': host ip or name to manage and user
54 'db', 'db_lock': database class and lock to use it in exclusion
55 '''
56 threading.Thread.__init__(self)
57 self.name = name
58 self.host = host
59 self.user = user
60 self.db = db
61 self.db_lock = db_lock
62 self.test = test
63 self.develop_mode = develop_mode
64 self.develop_bridge_iface = develop_bridge_iface
65 self.image_path = image_path
66 self.host_id = host_id
67 self.version = version
68
69 self.xml_level = 0
70 #self.pending ={}
71
72 self.server_status = {} #dictionary with pairs server_uuid:server_status
73 self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
74 self.next_update_server_status = 0 #time when must be check servers status
75
76 self.hostinfo = None
77
78 self.queueLock = threading.Lock()
79 self.taskQueue = Queue.Queue(2000)
80
81 def ssh_connect(self):
82 try:
83 #Connect SSH
84 self.ssh_conn = paramiko.SSHClient()
85 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
86 self.ssh_conn.load_system_host_keys()
87 self.ssh_conn.connect(self.host, username=self.user, timeout=10) #, None)
88 except paramiko.ssh_exception.SSHException as e:
89 text = e.args[0]
90 print self.name, ": ssh_connect ssh Exception:", text
91
92 def load_localinfo(self):
93 if not self.test:
94 try:
95 #Connect SSH
96 self.ssh_connect()
97
98 command = 'mkdir -p ' + self.image_path
99 #print self.name, ': command:', command
100 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
101 content = stderr.read()
102 if len(content) > 0:
103 print self.name, ': command:', command, "stderr:", content
104
105 command = 'cat ' + self.image_path + '/.openvim.yaml'
106 #print self.name, ': command:', command
107 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
108 content = stdout.read()
109 if len(content) == 0:
110 print self.name, ': command:', command, "stderr:", stderr.read()
111 raise paramiko.ssh_exception.SSHException("Error empty file ")
112 self.localinfo = yaml.load(content)
113 js_v(self.localinfo, localinfo_schema)
114 self.localinfo_dirty=False
115 if 'server_files' not in self.localinfo:
116 self.localinfo['server_files'] = {}
117 print self.name, ': localinfo load from host'
118 return
119
120 except paramiko.ssh_exception.SSHException as e:
121 text = e.args[0]
122 print self.name, ": load_localinfo ssh Exception:", text
123 except libvirt.libvirtError as e:
124 text = e.get_error_message()
125 print self.name, ": load_localinfo libvirt Exception:", text
126 except yaml.YAMLError as exc:
127 text = ""
128 if hasattr(exc, 'problem_mark'):
129 mark = exc.problem_mark
130 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
131 print self.name, ": load_localinfo yaml format Exception", text
132 except js_e.ValidationError as e:
133 text = ""
134 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
135 print self.name, ": load_localinfo format Exception:", text, e.message
136 except Exception as e:
137 text = str(e)
138 print self.name, ": load_localinfo Exception:", text
139
140 #not loaded, insert a default data and force saving by activating dirty flag
141 self.localinfo = {'files':{}, 'server_files':{} }
142 #self.localinfo_dirty=True
143 self.localinfo_dirty=False
144
145 def load_hostinfo(self):
146 if self.test:
147 return;
148 try:
149 #Connect SSH
150 self.ssh_connect()
151
152
153 command = 'cat ' + self.image_path + '/hostinfo.yaml'
154 #print self.name, ': command:', command
155 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
156 content = stdout.read()
157 if len(content) == 0:
158 print self.name, ': command:', command, "stderr:", stderr.read()
159 raise paramiko.ssh_exception.SSHException("Error empty file ")
160 self.hostinfo = yaml.load(content)
161 js_v(self.hostinfo, hostinfo_schema)
162 print self.name, ': hostlinfo load from host', self.hostinfo
163 return
164
165 except paramiko.ssh_exception.SSHException as e:
166 text = e.args[0]
167 print self.name, ": load_hostinfo ssh Exception:", text
168 except libvirt.libvirtError as e:
169 text = e.get_error_message()
170 print self.name, ": load_hostinfo libvirt Exception:", text
171 except yaml.YAMLError as exc:
172 text = ""
173 if hasattr(exc, 'problem_mark'):
174 mark = exc.problem_mark
175 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
176 print self.name, ": load_hostinfo yaml format Exception", text
177 except js_e.ValidationError as e:
178 text = ""
179 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
180 print self.name, ": load_hostinfo format Exception:", text, e.message
181 except Exception as e:
182 text = str(e)
183 print self.name, ": load_hostinfo Exception:", text
184
185 #not loaded, insert a default data
186 self.hostinfo = None
187
188 def save_localinfo(self, tries=3):
189 if self.test:
190 self.localinfo_dirty = False
191 return
192
193 while tries>=0:
194 tries-=1
195
196 try:
197 command = 'cat > ' + self.image_path + '/.openvim.yaml'
198 print self.name, ': command:', command
199 (stdin, _, _) = self.ssh_conn.exec_command(command)
200 yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True)
201 self.localinfo_dirty = False
202 break #while tries
203
204 except paramiko.ssh_exception.SSHException as e:
205 text = e.args[0]
206 print self.name, ": save_localinfo ssh Exception:", text
207 if "SSH session not active" in text:
208 self.ssh_connect()
209 except libvirt.libvirtError as e:
210 text = e.get_error_message()
211 print self.name, ": save_localinfo libvirt Exception:", text
212 except yaml.YAMLError as exc:
213 text = ""
214 if hasattr(exc, 'problem_mark'):
215 mark = exc.problem_mark
216 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
217 print self.name, ": save_localinfo yaml format Exception", text
218 except Exception as e:
219 text = str(e)
220 print self.name, ": save_localinfo Exception:", text
221
222 def load_servers_from_db(self):
223 self.db_lock.acquire()
224 r,c = self.db.get_table(SELECT=('uuid','status', 'image_id'), FROM='instances', WHERE={'host_id': self.host_id})
225 self.db_lock.release()
226
227 self.server_status = {}
228 if r<0:
229 print self.name, ": Error getting data from database:", c
230 return
231 for server in c:
232 self.server_status[ server['uuid'] ] = server['status']
233
234 #convert from old version to new one
235 if 'inc_files' in self.localinfo and server['uuid'] in self.localinfo['inc_files']:
236 server_files_dict = {'source file': self.localinfo['inc_files'][ server['uuid'] ] [0], 'file format':'raw' }
237 if server_files_dict['source file'][-5:] == 'qcow2':
238 server_files_dict['file format'] = 'qcow2'
239
240 self.localinfo['server_files'][ server['uuid'] ] = { server['image_id'] : server_files_dict }
241 if 'inc_files' in self.localinfo:
242 del self.localinfo['inc_files']
243 self.localinfo_dirty = True
244
245 def delete_unused_files(self):
246 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
247 Deletes unused entries at self.loacalinfo and the corresponding local files.
248 The only reason for this mismatch is the manual deletion of instances (VM) at database
249 '''
250 if self.test:
251 return
252 for uuid,images in self.localinfo['server_files'].items():
253 if uuid not in self.server_status:
254 for localfile in images.values():
255 try:
256 print self.name, ": deleting file '%s' of unused server '%s'" %(localfile['source file'], uuid)
257 self.delete_file(localfile['source file'])
258 except paramiko.ssh_exception.SSHException as e:
259 print self.name, ": Exception deleting file '%s': %s" %(localfile['source file'], str(e))
260 del self.localinfo['server_files'][uuid]
261 self.localinfo_dirty = True
262
263 def insert_task(self, task, *aditional):
264 try:
265 self.queueLock.acquire()
266 task = self.taskQueue.put( (task,) + aditional, timeout=5)
267 self.queueLock.release()
268 return 1, None
269 except Queue.Full:
270 return -1, "timeout inserting a task over host " + self.name
271
272 def run(self):
273 while True:
274 self.load_localinfo()
275 self.load_hostinfo()
276 self.load_servers_from_db()
277 self.delete_unused_files()
278 while True:
279 self.queueLock.acquire()
280 if not self.taskQueue.empty():
281 task = self.taskQueue.get()
282 else:
283 task = None
284 self.queueLock.release()
285
286 if task is None:
287 now=time.time()
288 if self.localinfo_dirty:
289 self.save_localinfo()
290 elif self.next_update_server_status < now:
291 self.update_servers_status()
292 self.next_update_server_status = now + 5
293 elif len(self.pending_terminate_server)>0 and self.pending_terminate_server[0][0]<now:
294 self.server_forceoff()
295 else:
296 time.sleep(1)
297 continue
298
299 if task[0] == 'instance':
300 print self.name, ": processing task instance", task[1]['action']
301 retry=0
302 while retry <2:
303 retry += 1
304 r=self.action_on_server(task[1], retry==2)
305 if r>=0:
306 break
307 elif task[0] == 'image':
308 pass
309 elif task[0] == 'exit':
310 print self.name, ": processing task exit"
311 self.terminate()
312 return 0
313 elif task[0] == 'reload':
314 print self.name, ": processing task reload terminating and relaunching"
315 self.terminate()
316 break
317 elif task[0] == 'edit-iface':
318 print self.name, ": processing task edit-iface port=%s, old_net=%s, new_net=%s" % (task[1], task[2], task[3])
319 self.edit_iface(task[1], task[2], task[3])
320 elif task[0] == 'restore-iface':
321 print self.name, ": processing task restore-iface %s mac=%s" % (task[1], task[2])
322 self.restore_iface(task[1], task[2])
323 else:
324 print self.name, ": unknown task", task
325
326 def server_forceoff(self, wait_until_finished=False):
327 while len(self.pending_terminate_server)>0:
328 now = time.time()
329 if self.pending_terminate_server[0][0]>now:
330 if wait_until_finished:
331 time.sleep(1)
332 continue
333 else:
334 return
335 req={'uuid':self.pending_terminate_server[0][1],
336 'action':{'terminate':'force'},
337 'status': None
338 }
339 self.action_on_server(req)
340 self.pending_terminate_server.pop(0)
341
342 def terminate(self):
343 try:
344 self.server_forceoff(True)
345 if self.localinfo_dirty:
346 self.save_localinfo()
347 if not self.test:
348 self.ssh_conn.close()
349 except Exception as e:
350 text = str(e)
351 print self.name, ": terminate Exception:", text
352 print self.name, ": exit from host_thread"
353
354 def get_local_iface_name(self, generic_name):
355 if self.hostinfo != None and "iface_names" in self.hostinfo and generic_name in self.hostinfo["iface_names"]:
356 return self.hostinfo["iface_names"][generic_name]
357 return generic_name
358
359 def create_xml_server(self, server, dev_list, server_metadata={}):
360 """Function that implements the generation of the VM XML definition.
361 Additional devices are in dev_list list
362 The main disk is upon dev_list[0]"""
363
364 #get if operating system is Windows
365 windows_os = False
366 os_type = server_metadata.get('os_type', None)
367 if os_type == None and 'metadata' in dev_list[0]:
368 os_type = dev_list[0]['metadata'].get('os_type', None)
369 if os_type != None and os_type.lower() == "windows":
370 windows_os = True
371 #get type of hard disk bus
372 bus_ide = True if windows_os else False
373 bus = server_metadata.get('bus', None)
374 if bus == None and 'metadata' in dev_list[0]:
375 bus = dev_list[0]['metadata'].get('bus', None)
376 if bus != None:
377 bus_ide = True if bus=='ide' else False
378
379 self.xml_level = 0
380
381 text = "<domain type='kvm'>"
382 #get topology
383 topo = server_metadata.get('topology', None)
384 if topo == None and 'metadata' in dev_list[0]:
385 topo = dev_list[0]['metadata'].get('topology', None)
386 #name
387 name = server.get('name','') + "_" + server['uuid']
388 name = name[:58] #qemu impose a length limit of 59 chars or not start. Using 58
389 text += self.inc_tab() + "<name>" + name+ "</name>"
390 #uuid
391 text += self.tab() + "<uuid>" + server['uuid'] + "</uuid>"
392
393 numa={}
394 if 'extended' in server and server['extended']!=None and 'numas' in server['extended']:
395 numa = server['extended']['numas'][0]
396 #memory
397 use_huge = False
398 memory = int(numa.get('memory',0))*1024*1024 #in KiB
399 if memory==0:
400 memory = int(server['ram'])*1024;
401 else:
402 if not self.develop_mode:
403 use_huge = True
404 if memory==0:
405 return -1, 'No memory assigned to instance'
406 memory = str(memory)
407 text += self.tab() + "<memory unit='KiB'>" +memory+"</memory>"
408 text += self.tab() + "<currentMemory unit='KiB'>" +memory+ "</currentMemory>"
409 if use_huge:
410 text += self.tab()+'<memoryBacking>'+ \
411 self.inc_tab() + '<hugepages/>'+ \
412 self.dec_tab()+ '</memoryBacking>'
413
414 #cpu
415 use_cpu_pinning=False
416 vcpus = int(server.get("vcpus",0))
417 cpu_pinning = []
418 if 'cores-source' in numa:
419 use_cpu_pinning=True
420 for index in range(0, len(numa['cores-source'])):
421 cpu_pinning.append( [ numa['cores-id'][index], numa['cores-source'][index] ] )
422 vcpus += 1
423 if 'threads-source' in numa:
424 use_cpu_pinning=True
425 for index in range(0, len(numa['threads-source'])):
426 cpu_pinning.append( [ numa['threads-id'][index], numa['threads-source'][index] ] )
427 vcpus += 1
428 if 'paired-threads-source' in numa:
429 use_cpu_pinning=True
430 for index in range(0, len(numa['paired-threads-source'])):
431 cpu_pinning.append( [numa['paired-threads-id'][index][0], numa['paired-threads-source'][index][0] ] )
432 cpu_pinning.append( [numa['paired-threads-id'][index][1], numa['paired-threads-source'][index][1] ] )
433 vcpus += 2
434
435 if use_cpu_pinning and not self.develop_mode:
436 text += self.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning)) +"</vcpu>" + \
437 self.tab()+'<cputune>'
438 self.xml_level += 1
439 for i in range(0, len(cpu_pinning)):
440 text += self.tab() + "<vcpupin vcpu='" +str(cpu_pinning[i][0])+ "' cpuset='" +str(cpu_pinning[i][1]) +"'/>"
441 text += self.dec_tab()+'</cputune>'+ \
442 self.tab() + '<numatune>' +\
443 self.inc_tab() + "<memory mode='strict' nodeset='" +str(numa['source'])+ "'/>" +\
444 self.dec_tab() + '</numatune>'
445 else:
446 if vcpus==0:
447 return -1, "Instance without number of cpus"
448 text += self.tab()+"<vcpu>" + str(vcpus) + "</vcpu>"
449
450 #boot
451 boot_cdrom = False
452 for dev in dev_list:
453 if dev['type']=='cdrom' :
454 boot_cdrom = True
455 break
456 text += self.tab()+ '<os>' + \
457 self.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
458 if boot_cdrom:
459 text += self.tab() + "<boot dev='cdrom'/>"
460 text += self.tab() + "<boot dev='hd'/>" + \
461 self.dec_tab()+'</os>'
462 #features
463 text += self.tab()+'<features>'+\
464 self.inc_tab()+'<acpi/>' +\
465 self.tab()+'<apic/>' +\
466 self.tab()+'<pae/>'+ \
467 self.dec_tab() +'</features>'
468 if windows_os or topo=="oneSocket":
469 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>"% vcpus
470 else:
471 text += self.tab() + "<cpu mode='host-model'></cpu>"
472 text += self.tab() + "<clock offset='utc'/>" +\
473 self.tab() + "<on_poweroff>preserve</on_poweroff>" + \
474 self.tab() + "<on_reboot>restart</on_reboot>" + \
475 self.tab() + "<on_crash>restart</on_crash>"
476 text += self.tab() + "<devices>" + \
477 self.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
478 self.tab() + "<serial type='pty'>" +\
479 self.inc_tab() + "<target port='0'/>" + \
480 self.dec_tab() + "</serial>" +\
481 self.tab() + "<console type='pty'>" + \
482 self.inc_tab()+ "<target type='serial' port='0'/>" + \
483 self.dec_tab()+'</console>'
484 if windows_os:
485 text += self.tab() + "<controller type='usb' index='0'/>" + \
486 self.tab() + "<controller type='ide' index='0'/>" + \
487 self.tab() + "<input type='mouse' bus='ps2'/>" + \
488 self.tab() + "<sound model='ich6'/>" + \
489 self.tab() + "<video>" + \
490 self.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
491 self.dec_tab() + "</video>" + \
492 self.tab() + "<memballoon model='virtio'/>" + \
493 self.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
494
495 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
496 #> self.dec_tab()+'</hostdev>\n' +\
497 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
498 if windows_os:
499 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
500 else:
501 #If image contains 'GRAPH' include graphics
502 #if 'GRAPH' in image:
503 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
504 self.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
505 self.dec_tab() + "</graphics>"
506
507 vd_index = 'a'
508 for dev in dev_list:
509 bus_ide_dev = bus_ide
510 if dev['type']=='cdrom' or dev['type']=='disk':
511 if dev['type']=='cdrom':
512 bus_ide_dev = True
513 text += self.tab() + "<disk type='file' device='"+dev['type']+"'>"
514 if 'file format' in dev:
515 text += self.inc_tab() + "<driver name='qemu' type='" +dev['file format']+ "' cache='writethrough'/>"
516 if 'source file' in dev:
517 text += self.tab() + "<source file='" +dev['source file']+ "'/>"
518 #elif v['type'] == 'block':
519 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
520 #else:
521 # return -1, 'Unknown disk type ' + v['type']
522 vpci = dev.get('vpci',None)
523 if vpci == None:
524 vpci = dev['metadata'].get('vpci',None)
525 text += self.pci2xml(vpci)
526
527 if bus_ide_dev:
528 text += self.tab() + "<target dev='hd" +vd_index+ "' bus='ide'/>" #TODO allows several type of disks
529 else:
530 text += self.tab() + "<target dev='vd" +vd_index+ "' bus='virtio'/>"
531 text += self.dec_tab() + '</disk>'
532 vd_index = chr(ord(vd_index)+1)
533 elif dev['type']=='xml':
534 dev_text = dev['xml']
535 if 'vpci' in dev:
536 dev_text = dev_text.replace('__vpci__', dev['vpci'])
537 if 'source file' in dev:
538 dev_text = dev_text.replace('__file__', dev['source file'])
539 if 'file format' in dev:
540 dev_text = dev_text.replace('__format__', dev['source file'])
541 if '__dev__' in dev_text:
542 dev_text = dev_text.replace('__dev__', vd_index)
543 vd_index = chr(ord(vd_index)+1)
544 text += dev_text
545 else:
546 return -1, 'Unknown device type ' + dev['type']
547
548 net_nb=0
549 bridge_interfaces = server.get('networks', [])
550 for v in bridge_interfaces:
551 #Get the brifge name
552 self.db_lock.acquire()
553 result, content = self.db.get_table(FROM='nets', SELECT=('provider',),WHERE={'uuid':v['net_id']} )
554 self.db_lock.release()
555 if result <= 0:
556 print "create_xml_server ERROR getting nets",result, content
557 return -1, content
558 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
559 #I know it is not secure
560 #for v in sorted(desc['network interfaces'].itervalues()):
561 model = v.get("model", None)
562 if content[0]['provider']=='default':
563 text += self.tab() + "<interface type='network'>" + \
564 self.inc_tab() + "<source network='" +content[0]['provider']+ "'/>"
565 elif content[0]['provider'][0:7]=='macvtap':
566 text += self.tab()+"<interface type='direct'>" + \
567 self.inc_tab() + "<source dev='" + self.get_local_iface_name(content[0]['provider'][8:]) + "' mode='bridge'/>" + \
568 self.tab() + "<target dev='macvtap0'/>"
569 if windows_os:
570 text += self.tab() + "<alias name='net" + str(net_nb) + "'/>"
571 elif model==None:
572 model = "virtio"
573 elif content[0]['provider'][0:6]=='bridge':
574 text += self.tab() + "<interface type='bridge'>" + \
575 self.inc_tab()+"<source bridge='" +self.get_local_iface_name(content[0]['provider'][7:])+ "'/>"
576 if windows_os:
577 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
578 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
579 elif model==None:
580 model = "virtio"
581 else:
582 return -1, 'Unknown Bridge net provider ' + content[0]['provider']
583 if model!=None:
584 text += self.tab() + "<model type='" +model+ "'/>"
585 if v.get('mac_address', None) != None:
586 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
587 text += self.pci2xml(v.get('vpci',None))
588 text += self.dec_tab()+'</interface>'
589
590 net_nb += 1
591
592 interfaces = numa.get('interfaces', [])
593
594 net_nb=0
595 for v in interfaces:
596 if self.develop_mode: #map these interfaces to bridges
597 text += self.tab() + "<interface type='bridge'>" + \
598 self.inc_tab()+"<source bridge='" +self.develop_bridge_iface+ "'/>"
599 if windows_os:
600 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
601 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
602 else:
603 text += self.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
604 if v.get('mac_address', None) != None:
605 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
606 text += self.pci2xml(v.get('vpci',None))
607 text += self.dec_tab()+'</interface>'
608 continue
609
610 if v['dedicated'] == 'yes': #passthrought
611 text += self.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
612 self.inc_tab() + "<source>"
613 self.inc_tab()
614 text += self.pci2xml(v['source'])
615 text += self.dec_tab()+'</source>'
616 text += self.pci2xml(v.get('vpci',None))
617 if windows_os:
618 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
619 text += self.dec_tab()+'</hostdev>'
620 net_nb += 1
621 else: #sriov_interfaces
622 #skip not connected interfaces
623 if v.get("net_id") == None:
624 continue
625 text += self.tab() + "<interface type='hostdev' managed='yes'>"
626 self.inc_tab()
627 if v.get('mac_address', None) != None:
628 text+= self.tab() + "<mac address='" +v['mac_address']+ "'/>"
629 text+= self.tab()+'<source>'
630 self.inc_tab()
631 text += self.pci2xml(v['source'])
632 text += self.dec_tab()+'</source>'
633 if v.get('vlan',None) != None:
634 text += self.tab() + "<vlan> <tag id='" + str(v['vlan']) + "'/> </vlan>"
635 text += self.pci2xml(v.get('vpci',None))
636 if windows_os:
637 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
638 text += self.dec_tab()+'</interface>'
639
640
641 text += self.dec_tab()+'</devices>'+\
642 self.dec_tab()+'</domain>'
643 return 0, text
644
645 def pci2xml(self, pci):
646 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
647 alows an empty pci text'''
648 if pci is None:
649 return ""
650 first_part = pci.split(':')
651 second_part = first_part[2].split('.')
652 return self.tab() + "<address type='pci' domain='0x" + first_part[0] + \
653 "' bus='0x" + first_part[1] + "' slot='0x" + second_part[0] + \
654 "' function='0x" + second_part[1] + "'/>"
655
656 def tab(self):
657 """Return indentation according to xml_level"""
658 return "\n" + (' '*self.xml_level)
659
660 def inc_tab(self):
661 """Increment and return indentation according to xml_level"""
662 self.xml_level += 1
663 return self.tab()
664
665 def dec_tab(self):
666 """Decrement and return indentation according to xml_level"""
667 self.xml_level -= 1
668 return self.tab()
669
670 def get_file_info(self, path):
671 command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
672 print self.name, ': command:', command
673 (_, stdout, _) = self.ssh_conn.exec_command(command)
674 content = stdout.read()
675 if len(content) == 0:
676 return None # file does not exist
677 else:
678 return content.split(" ") #(permission, 1, owner, group, size, date, file)
679
680 def qemu_get_info(self, path):
681 command = 'qemu-img info ' + path
682 print self.name, ': command:', command
683 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
684 content = stdout.read()
685 if len(content) == 0:
686 error = stderr.read()
687 print self.name, ": get_qemu_info error ", error
688 raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error)
689 else:
690 try:
691 return yaml.load(content)
692 except yaml.YAMLError as exc:
693 text = ""
694 if hasattr(exc, 'problem_mark'):
695 mark = exc.problem_mark
696 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
697 print self.name, ": get_qemu_info yaml format Exception", text
698 raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text)
699
700 def qemu_change_backing(self, inc_file, new_backing_file):
701 command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file
702 print self.name, ': command:', command
703 (_, _, stderr) = self.ssh_conn.exec_command(command)
704 content = stderr.read()
705 if len(content) == 0:
706 return 0
707 else:
708 print self.name, ": qemu_change_backing error: ", content
709 return -1
710
711 def get_notused_filename(self, proposed_name, suffix=''):
712 '''Look for a non existing file_name in the host
713 proposed_name: proposed file name, includes path
714 suffix: suffix to be added to the name, before the extention
715 '''
716 extension = proposed_name.rfind(".")
717 slash = proposed_name.rfind("/")
718 if extension < 0 or extension < slash: # no extension
719 extension = len(proposed_name)
720 target_name = proposed_name[:extension] + suffix + proposed_name[extension:]
721 info = self.get_file_info(target_name)
722 if info is None:
723 return target_name
724
725 index=0
726 while info is not None:
727 target_name = proposed_name[:extension] + suffix + "-" + str(index) + proposed_name[extension:]
728 index+=1
729 info = self.get_file_info(target_name)
730 return target_name
731
732 def get_notused_path(self, proposed_path, suffix=''):
733 '''Look for a non existing path at database for images
734 proposed_path: proposed file name, includes path
735 suffix: suffix to be added to the name, before the extention
736 '''
737 extension = proposed_path.rfind(".")
738 if extension < 0:
739 extension = len(proposed_path)
740 if suffix != None:
741 target_path = proposed_path[:extension] + suffix + proposed_path[extension:]
742 index=0
743 while True:
744 r,_=self.db.get_table(FROM="images",WHERE={"path":target_path})
745 if r<=0:
746 return target_path
747 target_path = proposed_path[:extension] + suffix + "-" + str(index) + proposed_path[extension:]
748 index+=1
749
750
751 def delete_file(self, file_name):
752 command = 'rm -f '+file_name
753 print self.name, ': command:', command
754 (_, _, stderr) = self.ssh_conn.exec_command(command)
755 error_msg = stderr.read()
756 if len(error_msg) > 0:
757 raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg)
758
759 def copy_file(self, source, destination, perserve_time=True):
760 if source[0:4]=="http":
761 command = "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
762 dst=destination, src=source, dst_result=destination + ".result" )
763 else:
764 command = 'cp --no-preserve=mode'
765 if perserve_time:
766 command += ' --preserve=timestamps'
767 command += " '{}' '{}'".format(source, destination)
768 print self.name, ': command:', command
769 (_, _, stderr) = self.ssh_conn.exec_command(command)
770 error_msg = stderr.read()
771 if len(error_msg) > 0:
772 raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg)
773
774 def copy_remote_file(self, remote_file, use_incremental):
775 ''' Copy a file from the repository to local folder and recursively
776 copy the backing files in case the remote file is incremental
777 Read and/or modified self.localinfo['files'] that contain the
778 unmodified copies of images in the local path
779 params:
780 remote_file: path of remote file
781 use_incremental: None (leave the decision to this function), True, False
782 return:
783 local_file: name of local file
784 qemu_info: dict with quemu information of local file
785 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
786 '''
787
788 use_incremental_out = use_incremental
789 new_backing_file = None
790 local_file = None
791 file_from_local = True
792
793 #in case incremental use is not decided, take the decision depending on the image
794 #avoid the use of incremental if this image is already incremental
795 if remote_file[0:4] == "http":
796 file_from_local = False
797 if file_from_local:
798 qemu_remote_info = self.qemu_get_info(remote_file)
799 if use_incremental_out==None:
800 use_incremental_out = not ( file_from_local and 'backing file' in qemu_remote_info)
801 #copy recursivelly the backing files
802 if file_from_local and 'backing file' in qemu_remote_info:
803 new_backing_file, _, _ = self.copy_remote_file(qemu_remote_info['backing file'], True)
804
805 #check if remote file is present locally
806 if use_incremental_out and remote_file in self.localinfo['files']:
807 local_file = self.localinfo['files'][remote_file]
808 local_file_info = self.get_file_info(local_file)
809 if file_from_local:
810 remote_file_info = self.get_file_info(remote_file)
811 if local_file_info == None:
812 local_file = None
813 elif file_from_local and (local_file_info[4]!=remote_file_info[4] or local_file_info[5]!=remote_file_info[5]):
814 #local copy of file not valid because date or size are different.
815 #TODO DELETE local file if this file is not used by any active virtual machine
816 try:
817 self.delete_file(local_file)
818 del self.localinfo['files'][remote_file]
819 except Exception:
820 pass
821 local_file = None
822 else: #check that the local file has the same backing file, or there are not backing at all
823 qemu_info = self.qemu_get_info(local_file)
824 if new_backing_file != qemu_info.get('backing file'):
825 local_file = None
826
827
828 if local_file == None: #copy the file
829 img_name= remote_file.split('/') [-1]
830 img_local = self.image_path + '/' + img_name
831 local_file = self.get_notused_filename(img_local)
832 self.copy_file(remote_file, local_file, use_incremental_out)
833
834 if use_incremental_out:
835 self.localinfo['files'][remote_file] = local_file
836 if new_backing_file:
837 self.qemu_change_backing(local_file, new_backing_file)
838 qemu_info = self.qemu_get_info(local_file)
839
840 return local_file, qemu_info, use_incremental_out
841
842 def launch_server(self, conn, server, rebuild=False, domain=None):
843 if self.test:
844 time.sleep(random.randint(20,150)) #sleep random timeto be make it a bit more real
845 return 0, 'Success'
846
847 server_id = server['uuid']
848 paused = server.get('paused','no')
849 try:
850 if domain!=None and rebuild==False:
851 domain.resume()
852 #self.server_status[server_id] = 'ACTIVE'
853 return 0, 'Success'
854
855 self.db_lock.acquire()
856 result, server_data = self.db.get_instance(server_id)
857 self.db_lock.release()
858 if result <= 0:
859 print self.name, ": launch_server ERROR getting server from DB",result, server_data
860 return result, server_data
861
862 #0: get image metadata
863 server_metadata = server.get('metadata', {})
864 use_incremental = None
865
866 if "use_incremental" in server_metadata:
867 use_incremental = False if server_metadata["use_incremental"]=="no" else True
868
869 server_host_files = self.localinfo['server_files'].get( server['uuid'], {})
870 if rebuild:
871 #delete previous incremental files
872 for file_ in server_host_files.values():
873 self.delete_file(file_['source file'] )
874 server_host_files={}
875
876 #1: obtain aditional devices (disks)
877 #Put as first device the main disk
878 devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ]
879 if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']:
880 devices += server_data['extended']['devices']
881
882 for dev in devices:
883 if dev['image_id'] == None:
884 continue
885
886 self.db_lock.acquire()
887 result, content = self.db.get_table(FROM='images', SELECT=('path','metadata'),WHERE={'uuid':dev['image_id']} )
888 self.db_lock.release()
889 if result <= 0:
890 error_text = "ERROR", result, content, "when getting image", dev['image_id']
891 print self.name, ": launch_server", error_text
892 return -1, error_text
893 if content[0]['metadata'] is not None:
894 dev['metadata'] = json.loads(content[0]['metadata'])
895 else:
896 dev['metadata'] = {}
897
898 if dev['image_id'] in server_host_files:
899 dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path
900 dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2
901 continue
902
903 #2: copy image to host
904 remote_file = content[0]['path']
905 use_incremental_image = use_incremental
906 if dev['metadata'].get("use_incremental") == "no":
907 use_incremental_image = False
908 local_file, qemu_info, use_incremental_image = self.copy_remote_file(remote_file, use_incremental_image)
909
910 #create incremental image
911 if use_incremental_image:
912 local_file_inc = self.get_notused_filename(local_file, '.inc')
913 command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file
914 print 'command:', command
915 (_, _, stderr) = self.ssh_conn.exec_command(command)
916 error_msg = stderr.read()
917 if len(error_msg) > 0:
918 raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg)
919 local_file = local_file_inc
920 qemu_info = {'file format':'qcow2'}
921
922 server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']}
923
924 dev['source file'] = local_file
925 dev['file format'] = qemu_info['file format']
926
927 self.localinfo['server_files'][ server['uuid'] ] = server_host_files
928 self.localinfo_dirty = True
929
930 #3 Create XML
931 result, xml = self.create_xml_server(server_data, devices, server_metadata) #local_file
932 if result <0:
933 print self.name, ": create xml server error:", xml
934 return -2, xml
935 print self.name, ": create xml:", xml
936 atribute = libvirt.VIR_DOMAIN_START_PAUSED if paused == "yes" else 0
937 #4 Start the domain
938 if not rebuild: #ensures that any pending destroying server is done
939 self.server_forceoff(True)
940 #print self.name, ": launching instance" #, xml
941 conn.createXML(xml, atribute)
942 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
943
944 return 0, 'Success'
945
946 except paramiko.ssh_exception.SSHException as e:
947 text = e.args[0]
948 print self.name, ": launch_server(%s) ssh Exception: %s" %(server_id, text)
949 if "SSH session not active" in text:
950 self.ssh_connect()
951 except libvirt.libvirtError as e:
952 text = e.get_error_message()
953 print self.name, ": launch_server(%s) libvirt Exception: %s" %(server_id, text)
954 except Exception as e:
955 text = str(e)
956 print self.name, ": launch_server(%s) Exception: %s" %(server_id, text)
957 return -1, text
958
959 def update_servers_status(self):
960 # # virDomainState
961 # VIR_DOMAIN_NOSTATE = 0
962 # VIR_DOMAIN_RUNNING = 1
963 # VIR_DOMAIN_BLOCKED = 2
964 # VIR_DOMAIN_PAUSED = 3
965 # VIR_DOMAIN_SHUTDOWN = 4
966 # VIR_DOMAIN_SHUTOFF = 5
967 # VIR_DOMAIN_CRASHED = 6
968 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
969
970 if self.test or len(self.server_status)==0:
971 return
972
973 try:
974 conn = libvirt.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
975 domains= conn.listAllDomains()
976 domain_dict={}
977 for domain in domains:
978 uuid = domain.UUIDString() ;
979 libvirt_status = domain.state()
980 #print libvirt_status
981 if libvirt_status[0] == libvirt.VIR_DOMAIN_RUNNING or libvirt_status[0] == libvirt.VIR_DOMAIN_SHUTDOWN:
982 new_status = "ACTIVE"
983 elif libvirt_status[0] == libvirt.VIR_DOMAIN_PAUSED:
984 new_status = "PAUSED"
985 elif libvirt_status[0] == libvirt.VIR_DOMAIN_SHUTOFF:
986 new_status = "INACTIVE"
987 elif libvirt_status[0] == libvirt.VIR_DOMAIN_CRASHED:
988 new_status = "ERROR"
989 else:
990 new_status = None
991 domain_dict[uuid] = new_status
992 conn.close
993 except libvirt.libvirtError as e:
994 print self.name, ": get_state() Exception '", e.get_error_message()
995 return
996
997 for server_id, current_status in self.server_status.iteritems():
998 new_status = None
999 if server_id in domain_dict:
1000 new_status = domain_dict[server_id]
1001 else:
1002 new_status = "INACTIVE"
1003
1004 if new_status == None or new_status == current_status:
1005 continue
1006 if new_status == 'INACTIVE' and current_status == 'ERROR':
1007 continue #keep ERROR status, because obviously this machine is not running
1008 #change status
1009 print self.name, ": server ", server_id, "status change from ", current_status, "to", new_status
1010 STATUS={'progress':100, 'status':new_status}
1011 if new_status == 'ERROR':
1012 STATUS['last_error'] = 'machine has crashed'
1013 self.db_lock.acquire()
1014 r,_ = self.db.update_rows('instances', STATUS, {'uuid':server_id}, log=False)
1015 self.db_lock.release()
1016 if r>=0:
1017 self.server_status[server_id] = new_status
1018
1019 def action_on_server(self, req, last_retry=True):
1020 '''Perform an action on a req
1021 Attributes:
1022 req: dictionary that contain:
1023 server properties: 'uuid','name','tenant_id','status'
1024 action: 'action'
1025 host properties: 'user', 'ip_name'
1026 return (error, text)
1027 0: No error. VM is updated to new state,
1028 -1: Invalid action, as trying to pause a PAUSED VM
1029 -2: Error accessing host
1030 -3: VM nor present
1031 -4: Error at DB access
1032 -5: Error while trying to perform action. VM is updated to ERROR
1033 '''
1034 server_id = req['uuid']
1035 conn = None
1036 new_status = None
1037 old_status = req['status']
1038 last_error = None
1039
1040 if self.test:
1041 if 'terminate' in req['action']:
1042 new_status = 'deleted'
1043 elif 'shutoff' in req['action'] or 'shutdown' in req['action'] or 'forceOff' in req['action']:
1044 if req['status']!='ERROR':
1045 time.sleep(5)
1046 new_status = 'INACTIVE'
1047 elif 'start' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1048 elif 'resume' in req['action'] and req['status']!='ERROR' and req['status']!='INACTIVE' : new_status = 'ACTIVE'
1049 elif 'pause' in req['action'] and req['status']!='ERROR': new_status = 'PAUSED'
1050 elif 'reboot' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1051 elif 'rebuild' in req['action']:
1052 time.sleep(random.randint(20,150))
1053 new_status = 'ACTIVE'
1054 elif 'createImage' in req['action']:
1055 time.sleep(5)
1056 self.create_image(None, req)
1057 else:
1058 try:
1059 conn = libvirt.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1060 try:
1061 dom = conn.lookupByUUIDString(server_id)
1062 except libvirt.libvirtError as e:
1063 text = e.get_error_message()
1064 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1065 dom = None
1066 else:
1067 print self.name, ": action_on_server(",server_id,") libvirt exception:", text
1068 raise e
1069
1070 if 'forceOff' in req['action']:
1071 if dom == None:
1072 print self.name, ": action_on_server(",server_id,") domain not running"
1073 else:
1074 try:
1075 print self.name, ": sending DESTROY to server", server_id
1076 dom.destroy()
1077 except Exception as e:
1078 if "domain is not running" not in e.get_error_message():
1079 print self.name, ": action_on_server(",server_id,") Exception while sending force off:", e.get_error_message()
1080 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1081 new_status = 'ERROR'
1082
1083 elif 'terminate' in req['action']:
1084 if dom == None:
1085 print self.name, ": action_on_server(",server_id,") domain not running"
1086 new_status = 'deleted'
1087 else:
1088 try:
1089 if req['action']['terminate'] == 'force':
1090 print self.name, ": sending DESTROY to server", server_id
1091 dom.destroy()
1092 new_status = 'deleted'
1093 else:
1094 print self.name, ": sending SHUTDOWN to server", server_id
1095 dom.shutdown()
1096 self.pending_terminate_server.append( (time.time()+10,server_id) )
1097 except Exception as e:
1098 print self.name, ": action_on_server(",server_id,") Exception while destroy:", e.get_error_message()
1099 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1100 new_status = 'ERROR'
1101 if "domain is not running" in e.get_error_message():
1102 try:
1103 dom.undefine()
1104 new_status = 'deleted'
1105 except Exception:
1106 print self.name, ": action_on_server(",server_id,") Exception while undefine:", e.get_error_message()
1107 last_error = 'action_on_server Exception2 while undefine:', e.get_error_message()
1108 #Exception: 'virDomainDetachDevice() failed'
1109 if new_status=='deleted':
1110 if server_id in self.server_status:
1111 del self.server_status[server_id]
1112 if req['uuid'] in self.localinfo['server_files']:
1113 for file_ in self.localinfo['server_files'][ req['uuid'] ].values():
1114 try:
1115 self.delete_file(file_['source file'])
1116 except Exception:
1117 pass
1118 del self.localinfo['server_files'][ req['uuid'] ]
1119 self.localinfo_dirty = True
1120
1121 elif 'shutoff' in req['action'] or 'shutdown' in req['action']:
1122 try:
1123 if dom == None:
1124 print self.name, ": action_on_server(",server_id,") domain not running"
1125 else:
1126 dom.shutdown()
1127 # new_status = 'INACTIVE'
1128 #TODO: check status for changing at database
1129 except Exception as e:
1130 new_status = 'ERROR'
1131 print self.name, ": action_on_server(",server_id,") Exception while shutdown:", e.get_error_message()
1132 last_error = 'action_on_server Exception while shutdown: ' + e.get_error_message()
1133
1134 elif 'rebuild' in req['action']:
1135 if dom != None:
1136 dom.destroy()
1137 r = self.launch_server(conn, req, True, None)
1138 if r[0] <0:
1139 new_status = 'ERROR'
1140 last_error = r[1]
1141 else:
1142 new_status = 'ACTIVE'
1143 elif 'start' in req['action']:
1144 #La instancia está sólo en la base de datos pero no en la libvirt. es necesario crearla
1145 rebuild = True if req['action']['start']=='rebuild' else False
1146 r = self.launch_server(conn, req, rebuild, dom)
1147 if r[0] <0:
1148 new_status = 'ERROR'
1149 last_error = r[1]
1150 else:
1151 new_status = 'ACTIVE'
1152
1153 elif 'resume' in req['action']:
1154 try:
1155 if dom == None:
1156 pass
1157 else:
1158 dom.resume()
1159 # new_status = 'ACTIVE'
1160 except Exception as e:
1161 print self.name, ": action_on_server(",server_id,") Exception while resume:", e.get_error_message()
1162
1163 elif 'pause' in req['action']:
1164 try:
1165 if dom == None:
1166 pass
1167 else:
1168 dom.suspend()
1169 # new_status = 'PAUSED'
1170 except Exception as e:
1171 print self.name, ": action_on_server(",server_id,") Exception while pause:", e.get_error_message()
1172
1173 elif 'reboot' in req['action']:
1174 try:
1175 if dom == None:
1176 pass
1177 else:
1178 dom.reboot()
1179 print self.name, ": action_on_server(",server_id,") reboot:"
1180 #new_status = 'ACTIVE'
1181 except Exception as e:
1182 print self.name, ": action_on_server(",server_id,") Exception while reboot:", e.get_error_message()
1183 elif 'createImage' in req['action']:
1184 self.create_image(dom, req)
1185
1186
1187 conn.close()
1188 except libvirt.libvirtError as e:
1189 if conn is not None: conn.close
1190 text = e.get_error_message()
1191 new_status = "ERROR"
1192 last_error = text
1193 print self.name, ": action_on_server(",server_id,") Exception '", text
1194 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1195 print self.name, ": action_on_server(",server_id,") Exception removed from host"
1196 #end of if self.test
1197 if new_status == None:
1198 return 1
1199
1200 print self.name, ": action_on_server(",server_id,") new status", new_status, last_error
1201 UPDATE = {'progress':100, 'status':new_status}
1202
1203 if new_status=='ERROR':
1204 if not last_retry: #if there will be another retry do not update database
1205 return -1
1206 elif 'terminate' in req['action']:
1207 #PUT a log in the database
1208 print self.name, ": PANIC deleting server", server_id, last_error
1209 self.db_lock.acquire()
1210 self.db.new_row('logs',
1211 {'uuid':server_id, 'tenant_id':req['tenant_id'], 'related':'instances','level':'panic',
1212 'description':'PANIC deleting server from host '+self.name+': '+last_error}
1213 )
1214 self.db_lock.release()
1215 if server_id in self.server_status:
1216 del self.server_status[server_id]
1217 return -1
1218 else:
1219 UPDATE['last_error'] = last_error
1220 if new_status != 'deleted' and (new_status != old_status or new_status == 'ERROR') :
1221 self.db_lock.acquire()
1222 self.db.update_rows('instances', UPDATE, {'uuid':server_id}, log=True)
1223 self.server_status[server_id] = new_status
1224 self.db_lock.release()
1225 if new_status == 'ERROR':
1226 return -1
1227 return 1
1228
1229
1230 def restore_iface(self, name, mac, lib_conn=None):
1231 ''' make an ifdown, ifup to restore default parameter of na interface
1232 Params:
1233 mac: mac address of the interface
1234 lib_conn: connection to the libvirt, if None a new connection is created
1235 Return 0,None if ok, -1,text if fails
1236 '''
1237 conn=None
1238 ret = 0
1239 error_text=None
1240 if self.test:
1241 print self.name, ": restore_iface '%s' %s" % (name, mac)
1242 return 0, None
1243 try:
1244 if not lib_conn:
1245 conn = libvirt.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1246 else:
1247 conn = lib_conn
1248
1249 #wait to the pending VM deletion
1250 #TODO.Revise self.server_forceoff(True)
1251
1252 iface = conn.interfaceLookupByMACString(mac)
1253 iface.destroy()
1254 iface.create()
1255 print self.name, ": restore_iface '%s' %s" % (name, mac)
1256 except libvirt.libvirtError as e:
1257 error_text = e.get_error_message()
1258 print self.name, ": restore_iface '%s' '%s' libvirt exception: %s" %(name, mac, error_text)
1259 ret=-1
1260 finally:
1261 if lib_conn is None and conn is not None:
1262 conn.close
1263 return ret, error_text
1264
1265
1266 def create_image(self,dom, req):
1267 if self.test:
1268 if 'path' in req['action']['createImage']:
1269 file_dst = req['action']['createImage']['path']
1270 else:
1271 createImage=req['action']['createImage']
1272 img_name= createImage['source']['path']
1273 index=img_name.rfind('/')
1274 file_dst = self.get_notused_path(img_name[:index+1] + createImage['name'] + '.qcow2')
1275 image_status='ACTIVE'
1276 else:
1277 for retry in (0,1):
1278 try:
1279 server_id = req['uuid']
1280 createImage=req['action']['createImage']
1281 file_orig = self.localinfo['server_files'][server_id] [ createImage['source']['image_id'] ] ['source file']
1282 if 'path' in req['action']['createImage']:
1283 file_dst = req['action']['createImage']['path']
1284 else:
1285 img_name= createImage['source']['path']
1286 index=img_name.rfind('/')
1287 file_dst = self.get_notused_filename(img_name[:index+1] + createImage['name'] + '.qcow2')
1288
1289 self.copy_file(file_orig, file_dst)
1290 qemu_info = self.qemu_get_info(file_orig)
1291 if 'backing file' in qemu_info:
1292 for k,v in self.localinfo['files'].items():
1293 if v==qemu_info['backing file']:
1294 self.qemu_change_backing(file_dst, k)
1295 break
1296 image_status='ACTIVE'
1297 break
1298 except paramiko.ssh_exception.SSHException as e:
1299 image_status='ERROR'
1300 error_text = e.args[0]
1301 print self.name, "': create_image(",server_id,") ssh Exception:", error_text
1302 if "SSH session not active" in error_text and retry==0:
1303 self.ssh_connect()
1304 except Exception as e:
1305 image_status='ERROR'
1306 error_text = str(e)
1307 print self.name, "': create_image(",server_id,") Exception:", error_text
1308
1309 #TODO insert a last_error at database
1310 self.db_lock.acquire()
1311 self.db.update_rows('images', {'status':image_status, 'progress': 100, 'path':file_dst},
1312 {'uuid':req['new_image']['uuid']}, log=True)
1313 self.db_lock.release()
1314
1315 def edit_iface(self, port_id, old_net, new_net):
1316 #This action imply remove and insert interface to put proper parameters
1317 if self.test:
1318 time.sleep(1)
1319 else:
1320 #get iface details
1321 self.db_lock.acquire()
1322 r,c = self.db.get_table(FROM='ports as p join resources_port as rp on p.uuid=rp.port_id',
1323 WHERE={'port_id': port_id})
1324 self.db_lock.release()
1325 if r<0:
1326 print self.name, ": edit_iface(",port_id,") DDBB error:", c
1327 return
1328 elif r==0:
1329 print self.name, ": edit_iface(",port_id,") por not found"
1330 return
1331 port=c[0]
1332 if port["model"]!="VF":
1333 print self.name, ": edit_iface(",port_id,") ERROR model must be VF"
1334 return
1335 #create xml detach file
1336 xml=[]
1337 self.xml_level = 2
1338 xml.append("<interface type='hostdev' managed='yes'>")
1339 xml.append(" <mac address='" +port['mac']+ "'/>")
1340 xml.append(" <source>"+ self.pci2xml(port['pci'])+"\n </source>")
1341 xml.append('</interface>')
1342
1343
1344 try:
1345 conn=None
1346 conn = libvirt.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1347 dom = conn.lookupByUUIDString(port["instance_id"])
1348 if old_net:
1349 text="\n".join(xml)
1350 print self.name, ": edit_iface detaching SRIOV interface", text
1351 dom.detachDeviceFlags(text, flags=libvirt.VIR_DOMAIN_AFFECT_LIVE)
1352 if new_net:
1353 xml[-1] =" <vlan> <tag id='" + str(port['vlan']) + "'/> </vlan>"
1354 self.xml_level = 1
1355 xml.append(self.pci2xml(port.get('vpci',None)) )
1356 xml.append('</interface>')
1357 text="\n".join(xml)
1358 print self.name, ": edit_iface attaching SRIOV interface", text
1359 dom.attachDeviceFlags(text, flags=libvirt.VIR_DOMAIN_AFFECT_LIVE)
1360
1361 except libvirt.libvirtError as e:
1362 text = e.get_error_message()
1363 print self.name, ": edit_iface(",port["instance_id"],") libvirt exception:", text
1364
1365 finally:
1366 if conn is not None: conn.close
1367
1368
1369 def create_server(server, db, db_lock, only_of_ports):
1370 #print "server"
1371 #print "server"
1372 #print server
1373 #print "server"
1374 #print "server"
1375 #try:
1376 # host_id = server.get('host_id', None)
1377 extended = server.get('extended', None)
1378
1379 # print '----------------------'
1380 # print json.dumps(extended, indent=4)
1381
1382 requirements={}
1383 requirements['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
1384 requirements['ram'] = server['flavor'].get('ram', 0)
1385 if requirements['ram']== None:
1386 requirements['ram'] = 0
1387 requirements['vcpus'] = server['flavor'].get('vcpus', 0)
1388 if requirements['vcpus']== None:
1389 requirements['vcpus'] = 0
1390 #If extended is not defined get requirements from flavor
1391 if extended is None:
1392 #If extended is defined in flavor convert to dictionary and use it
1393 if 'extended' in server['flavor'] and server['flavor']['extended'] != None:
1394 json_acceptable_string = server['flavor']['extended'].replace("'", "\"")
1395 extended = json.loads(json_acceptable_string)
1396 else:
1397 extended = None
1398 #print json.dumps(extended, indent=4)
1399
1400 #For simplicity only one numa VM are supported in the initial implementation
1401 if extended != None:
1402 numas = extended.get('numas', [])
1403 if len(numas)>1:
1404 return (-2, "Multi-NUMA VMs are not supported yet")
1405 #elif len(numas)<1:
1406 # return (-1, "At least one numa must be specified")
1407
1408 #a for loop is used in order to be ready to multi-NUMA VMs
1409 request = []
1410 for numa in numas:
1411 numa_req = {}
1412 numa_req['memory'] = numa.get('memory', 0)
1413 if 'cores' in numa:
1414 numa_req['proc_req_nb'] = numa['cores'] #number of cores or threads to be reserved
1415 numa_req['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
1416 numa_req['proc_req_list'] = numa.get('cores-id', None) #list of ids to be assigned to the cores or threads
1417 elif 'paired-threads' in numa:
1418 numa_req['proc_req_nb'] = numa['paired-threads']
1419 numa_req['proc_req_type'] = 'paired-threads'
1420 numa_req['proc_req_list'] = numa.get('paired-threads-id', None)
1421 elif 'threads' in numa:
1422 numa_req['proc_req_nb'] = numa['threads']
1423 numa_req['proc_req_type'] = 'threads'
1424 numa_req['proc_req_list'] = numa.get('threads-id', None)
1425 else:
1426 numa_req['proc_req_nb'] = 0 # by default
1427 numa_req['proc_req_type'] = 'threads'
1428
1429
1430
1431 #Generate a list of sriov and another for physical interfaces
1432 interfaces = numa.get('interfaces', [])
1433 sriov_list = []
1434 port_list = []
1435 for iface in interfaces:
1436 iface['bandwidth'] = int(iface['bandwidth'])
1437 if iface['dedicated'][:3]=='yes':
1438 port_list.append(iface)
1439 else:
1440 sriov_list.append(iface)
1441
1442 #Save lists ordered from more restrictive to less bw requirements
1443 numa_req['sriov_list'] = sorted(sriov_list, key=lambda k: k['bandwidth'], reverse=True)
1444 numa_req['port_list'] = sorted(port_list, key=lambda k: k['bandwidth'], reverse=True)
1445
1446
1447 request.append(numa_req)
1448
1449 # print "----------\n"+json.dumps(request[0], indent=4)
1450 # print '----------\n\n'
1451
1452 #Search in db for an appropriate numa for each requested numa
1453 #at the moment multi-NUMA VMs are not supported
1454 if len(request)>0:
1455 requirements['numa'].update(request[0])
1456 if requirements['numa']['memory']>0:
1457 requirements['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
1458 elif requirements['ram']==0:
1459 return (-1, "Memory information not set neither at extended field not at ram")
1460 if requirements['numa']['proc_req_nb']>0:
1461 requirements['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
1462 elif requirements['vcpus']==0:
1463 return (-1, "Processor information not set neither at extended field not at vcpus")
1464
1465
1466 db_lock.acquire()
1467 result, content = db.get_numas(requirements, server.get('host_id', None), only_of_ports)
1468 db_lock.release()
1469
1470 if result == -1:
1471 return (-1, content)
1472
1473 numa_id = content['numa_id']
1474 host_id = content['host_id']
1475
1476 #obtain threads_id and calculate pinning
1477 cpu_pinning = []
1478 reserved_threads=[]
1479 if requirements['numa']['proc_req_nb']>0:
1480 db_lock.acquire()
1481 result, content = db.get_table(FROM='resources_core',
1482 SELECT=('id','core_id','thread_id'),
1483 WHERE={'numa_id':numa_id,'instance_id': None, 'status':'ok'} )
1484 db_lock.release()
1485 if result <= 0:
1486 print content
1487 return -1, content
1488
1489 #convert rows to a dictionary indexed by core_id
1490 cores_dict = {}
1491 for row in content:
1492 if not row['core_id'] in cores_dict:
1493 cores_dict[row['core_id']] = []
1494 cores_dict[row['core_id']].append([row['thread_id'],row['id']])
1495
1496 #In case full cores are requested
1497 paired = 'N'
1498 if requirements['numa']['proc_req_type'] == 'cores':
1499 #Get/create the list of the vcpu_ids
1500 vcpu_id_list = requirements['numa']['proc_req_list']
1501 if vcpu_id_list == None:
1502 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
1503
1504 for threads in cores_dict.itervalues():
1505 #we need full cores
1506 if len(threads) != 2:
1507 continue
1508
1509 #set pinning for the first thread
1510 cpu_pinning.append( [ vcpu_id_list.pop(0), threads[0][0], threads[0][1] ] )
1511
1512 #reserve so it is not used the second thread
1513 reserved_threads.append(threads[1][1])
1514
1515 if len(vcpu_id_list) == 0:
1516 break
1517
1518 #In case paired threads are requested
1519 elif requirements['numa']['proc_req_type'] == 'paired-threads':
1520 paired = 'Y'
1521 #Get/create the list of the vcpu_ids
1522 if requirements['numa']['proc_req_list'] != None:
1523 vcpu_id_list = []
1524 for pair in requirements['numa']['proc_req_list']:
1525 if len(pair)!=2:
1526 return -1, "Field paired-threads-id not properly specified"
1527 return
1528 vcpu_id_list.append(pair[0])
1529 vcpu_id_list.append(pair[1])
1530 else:
1531 vcpu_id_list = range(0,2*int(requirements['numa']['proc_req_nb']))
1532
1533 for threads in cores_dict.itervalues():
1534 #we need full cores
1535 if len(threads) != 2:
1536 continue
1537 #set pinning for the first thread
1538 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
1539
1540 #set pinning for the second thread
1541 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
1542
1543 if len(vcpu_id_list) == 0:
1544 break
1545
1546 #In case normal threads are requested
1547 elif requirements['numa']['proc_req_type'] == 'threads':
1548 #Get/create the list of the vcpu_ids
1549 vcpu_id_list = requirements['numa']['proc_req_list']
1550 if vcpu_id_list == None:
1551 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
1552
1553 for threads_index in sorted(cores_dict, key=lambda k: len(cores_dict[k])):
1554 threads = cores_dict[threads_index]
1555 #set pinning for the first thread
1556 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
1557
1558 #if exists, set pinning for the second thread
1559 if len(threads) == 2 and len(vcpu_id_list) != 0:
1560 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
1561
1562 if len(vcpu_id_list) == 0:
1563 break
1564
1565 #Get the source pci addresses for the selected numa
1566 used_sriov_ports = []
1567 for port in requirements['numa']['sriov_list']:
1568 db_lock.acquire()
1569 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
1570 db_lock.release()
1571 if result <= 0:
1572 print content
1573 return -1, content
1574 for row in content:
1575 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
1576 continue
1577 port['pci'] = row['pci']
1578 if 'mac_address' not in port:
1579 port['mac_address'] = row['mac']
1580 del port['mac']
1581 port['port_id']=row['id']
1582 port['Mbps_used'] = port['bandwidth']
1583 used_sriov_ports.append(row['id'])
1584 break
1585
1586 for port in requirements['numa']['port_list']:
1587 port['Mbps_used'] = None
1588 if port['dedicated'] != "yes:sriov":
1589 port['mac_address'] = port['mac']
1590 del port['mac']
1591 continue
1592 db_lock.acquire()
1593 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac', 'Mbps'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
1594 db_lock.release()
1595 if result <= 0:
1596 print content
1597 return -1, content
1598 port['Mbps_used'] = content[0]['Mbps']
1599 for row in content:
1600 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
1601 continue
1602 port['pci'] = row['pci']
1603 if 'mac_address' not in port:
1604 port['mac_address'] = row['mac'] # mac cannot be set to passthrough ports
1605 del port['mac']
1606 port['port_id']=row['id']
1607 used_sriov_ports.append(row['id'])
1608 break
1609
1610 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
1611 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
1612
1613 server['host_id'] = host_id
1614
1615
1616 #Generate dictionary for saving in db the instance resources
1617 resources = {}
1618 resources['bridged-ifaces'] = []
1619
1620 numa_dict = {}
1621 numa_dict['interfaces'] = []
1622
1623 numa_dict['interfaces'] += requirements['numa']['port_list']
1624 numa_dict['interfaces'] += requirements['numa']['sriov_list']
1625
1626 #Check bridge information
1627 unified_dataplane_iface=[]
1628 unified_dataplane_iface += requirements['numa']['port_list']
1629 unified_dataplane_iface += requirements['numa']['sriov_list']
1630
1631 for control_iface in server.get('networks', []):
1632 control_iface['net_id']=control_iface.pop('uuid')
1633 #Get the brifge name
1634 db_lock.acquire()
1635 result, content = db.get_table(FROM='nets', SELECT=('name','type', 'vlan'),WHERE={'uuid':control_iface['net_id']} )
1636 db_lock.release()
1637 if result < 0:
1638 pass
1639 elif result==0:
1640 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface['net_id']
1641 else:
1642 network=content[0]
1643 if control_iface.get("type", 'virtual') == 'virtual':
1644 if network['type']!='bridge_data' and network['type']!='bridge_man':
1645 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface['net_id']
1646 resources['bridged-ifaces'].append(control_iface)
1647 else:
1648 if network['type']!='data' and network['type']!='ptp':
1649 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface['net_id']
1650 #dataplane interface, look for it in the numa tree and asign this network
1651 iface_found=False
1652 for dataplane_iface in numa_dict['interfaces']:
1653 if dataplane_iface['name'] == control_iface.get("name"):
1654 if (dataplane_iface['dedicated'] == "yes" and control_iface["type"] != "PF") or \
1655 (dataplane_iface['dedicated'] == "no" and control_iface["type"] != "VF") or \
1656 (dataplane_iface['dedicated'] == "yes:sriov" and control_iface["type"] != "VFnotShared") :
1657 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
1658 (control_iface.get("name"), dataplane_iface['dedicated'], control_iface["type"])
1659 dataplane_iface['uuid'] = control_iface['net_id']
1660 if dataplane_iface['dedicated'] == "no":
1661 dataplane_iface['vlan'] = network['vlan']
1662 if dataplane_iface['dedicated'] != "yes" and control_iface.get("mac_address"):
1663 dataplane_iface['mac_address'] = control_iface.get("mac_address")
1664 if control_iface.get("vpci"):
1665 dataplane_iface['vpci'] = control_iface.get("vpci")
1666 iface_found=True
1667 break
1668 if not iface_found:
1669 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface.get("name")
1670
1671 resources['host_id'] = host_id
1672 resources['image_id'] = server['image_id']
1673 resources['flavor_id'] = server['flavor_id']
1674 resources['tenant_id'] = server['tenant_id']
1675 resources['ram'] = requirements['ram']
1676 resources['vcpus'] = requirements['vcpus']
1677 resources['status'] = 'CREATING'
1678
1679 if 'description' in server: resources['description'] = server['description']
1680 if 'name' in server: resources['name'] = server['name']
1681
1682 resources['extended'] = {} #optional
1683 resources['extended']['numas'] = []
1684 numa_dict['numa_id'] = numa_id
1685 numa_dict['memory'] = requirements['numa']['memory']
1686 numa_dict['cores'] = []
1687
1688 for core in cpu_pinning:
1689 numa_dict['cores'].append({'id': core[2], 'vthread': core[0], 'paired': paired})
1690 for core in reserved_threads:
1691 numa_dict['cores'].append({'id': core})
1692 resources['extended']['numas'].append(numa_dict)
1693 if extended!=None and 'devices' in extended: #TODO allow extra devices without numa
1694 resources['extended']['devices'] = extended['devices']
1695
1696
1697 print '===================================={'
1698 print json.dumps(resources, indent=4)
1699 print '====================================}'
1700
1701 return 0, resources
1702