0aba07755e879292d708435701b24a242f7a86c4
[osm/openvim.git] / host_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openmano
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 '''
28 __author__="Pablo Montes, Alfonso Tierno"
29 __date__ ="$10-jul-2014 12:07:15$"
30
31
32 import json
33 import yaml
34 import threading
35 import time
36 import Queue
37 import paramiko
38 from jsonschema import validate as js_v, exceptions as js_e
39 import libvirt
40 from vim_schema import localinfo_schema, hostinfo_schema
41 import random
42 #from logging import Logger
43 #import utils.auxiliary_functions as af
44
45 #TODO: insert a logging system
46
47 class host_thread(threading.Thread):
48 def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode, develop_bridge_iface):
49 '''Init a thread.
50 Arguments:
51 'id' number of thead
52 'name' name of thread
53 'host','user': host ip or name to manage and user
54 'db', 'db_lock': database class and lock to use it in exclusion
55 '''
56 threading.Thread.__init__(self)
57 self.name = name
58 self.host = host
59 self.user = user
60 self.db = db
61 self.db_lock = db_lock
62 self.test = test
63 self.develop_mode = develop_mode
64 self.develop_bridge_iface = develop_bridge_iface
65 self.image_path = image_path
66 self.host_id = host_id
67 self.version = version
68
69 self.xml_level = 0
70 #self.pending ={}
71
72 self.server_status = {} #dictionary with pairs server_uuid:server_status
73 self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
74 self.next_update_server_status = 0 #time when must be check servers status
75
76 self.hostinfo = None
77
78 self.queueLock = threading.Lock()
79 self.taskQueue = Queue.Queue(2000)
80
81 def ssh_connect(self):
82 try:
83 #Connect SSH
84 self.ssh_conn = paramiko.SSHClient()
85 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
86 self.ssh_conn.load_system_host_keys()
87 self.ssh_conn.connect(self.host, username=self.user, timeout=10) #, None)
88 except paramiko.ssh_exception.SSHException as e:
89 text = e.args[0]
90 print self.name, ": ssh_connect ssh Exception:", text
91
92 def load_localinfo(self):
93 if not self.test:
94 try:
95 #Connect SSH
96 self.ssh_connect()
97
98 command = 'mkdir -p ' + self.image_path
99 #print self.name, ': command:', command
100 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
101 content = stderr.read()
102 if len(content) > 0:
103 print self.name, ': command:', command, "stderr:", content
104
105 command = 'cat ' + self.image_path + '/.openvim.yaml'
106 #print self.name, ': command:', command
107 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
108 content = stdout.read()
109 if len(content) == 0:
110 print self.name, ': command:', command, "stderr:", stderr.read()
111 raise paramiko.ssh_exception.SSHException("Error empty file ")
112 self.localinfo = yaml.load(content)
113 js_v(self.localinfo, localinfo_schema)
114 self.localinfo_dirty=False
115 if 'server_files' not in self.localinfo:
116 self.localinfo['server_files'] = {}
117 print self.name, ': localinfo load from host'
118 return
119
120 except paramiko.ssh_exception.SSHException as e:
121 text = e.args[0]
122 print self.name, ": load_localinfo ssh Exception:", text
123 except libvirt.libvirtError as e:
124 text = e.get_error_message()
125 print self.name, ": load_localinfo libvirt Exception:", text
126 except yaml.YAMLError as exc:
127 text = ""
128 if hasattr(exc, 'problem_mark'):
129 mark = exc.problem_mark
130 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
131 print self.name, ": load_localinfo yaml format Exception", text
132 except js_e.ValidationError as e:
133 text = ""
134 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
135 print self.name, ": load_localinfo format Exception:", text, e.message
136 except Exception as e:
137 text = str(e)
138 print self.name, ": load_localinfo Exception:", text
139
140 #not loaded, insert a default data and force saving by activating dirty flag
141 self.localinfo = {'files':{}, 'server_files':{} }
142 #self.localinfo_dirty=True
143 self.localinfo_dirty=False
144
145 def load_hostinfo(self):
146 if self.test:
147 return;
148 try:
149 #Connect SSH
150 self.ssh_connect()
151
152
153 command = 'cat ' + self.image_path + '/hostinfo.yaml'
154 #print self.name, ': command:', command
155 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
156 content = stdout.read()
157 if len(content) == 0:
158 print self.name, ': command:', command, "stderr:", stderr.read()
159 raise paramiko.ssh_exception.SSHException("Error empty file ")
160 self.hostinfo = yaml.load(content)
161 js_v(self.hostinfo, hostinfo_schema)
162 print self.name, ': hostlinfo load from host', self.hostinfo
163 return
164
165 except paramiko.ssh_exception.SSHException as e:
166 text = e.args[0]
167 print self.name, ": load_hostinfo ssh Exception:", text
168 except libvirt.libvirtError as e:
169 text = e.get_error_message()
170 print self.name, ": load_hostinfo libvirt Exception:", text
171 except yaml.YAMLError as exc:
172 text = ""
173 if hasattr(exc, 'problem_mark'):
174 mark = exc.problem_mark
175 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
176 print self.name, ": load_hostinfo yaml format Exception", text
177 except js_e.ValidationError as e:
178 text = ""
179 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
180 print self.name, ": load_hostinfo format Exception:", text, e.message
181 except Exception as e:
182 text = str(e)
183 print self.name, ": load_hostinfo Exception:", text
184
185 #not loaded, insert a default data
186 self.hostinfo = None
187
188 def save_localinfo(self, tries=3):
189 if self.test:
190 self.localinfo_dirty = False
191 return
192
193 while tries>=0:
194 tries-=1
195
196 try:
197 command = 'cat > ' + self.image_path + '/.openvim.yaml'
198 print self.name, ': command:', command
199 (stdin, _, _) = self.ssh_conn.exec_command(command)
200 yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True)
201 self.localinfo_dirty = False
202 break #while tries
203
204 except paramiko.ssh_exception.SSHException as e:
205 text = e.args[0]
206 print self.name, ": save_localinfo ssh Exception:", text
207 if "SSH session not active" in text:
208 self.ssh_connect()
209 except libvirt.libvirtError as e:
210 text = e.get_error_message()
211 print self.name, ": save_localinfo libvirt Exception:", text
212 except yaml.YAMLError as exc:
213 text = ""
214 if hasattr(exc, 'problem_mark'):
215 mark = exc.problem_mark
216 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
217 print self.name, ": save_localinfo yaml format Exception", text
218 except Exception as e:
219 text = str(e)
220 print self.name, ": save_localinfo Exception:", text
221
222 def load_servers_from_db(self):
223 self.db_lock.acquire()
224 r,c = self.db.get_table(SELECT=('uuid','status', 'image_id'), FROM='instances', WHERE={'host_id': self.host_id})
225 self.db_lock.release()
226
227 self.server_status = {}
228 if r<0:
229 print self.name, ": Error getting data from database:", c
230 return
231 for server in c:
232 self.server_status[ server['uuid'] ] = server['status']
233
234 #convert from old version to new one
235 if 'inc_files' in self.localinfo and server['uuid'] in self.localinfo['inc_files']:
236 server_files_dict = {'source file': self.localinfo['inc_files'][ server['uuid'] ] [0], 'file format':'raw' }
237 if server_files_dict['source file'][-5:] == 'qcow2':
238 server_files_dict['file format'] = 'qcow2'
239
240 self.localinfo['server_files'][ server['uuid'] ] = { server['image_id'] : server_files_dict }
241 if 'inc_files' in self.localinfo:
242 del self.localinfo['inc_files']
243 self.localinfo_dirty = True
244
245 def delete_unused_files(self):
246 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
247 Deletes unused entries at self.loacalinfo and the corresponding local files.
248 The only reason for this mismatch is the manual deletion of instances (VM) at database
249 '''
250 if self.test:
251 return
252 for uuid,images in self.localinfo['server_files'].items():
253 if uuid not in self.server_status:
254 for localfile in images.values():
255 try:
256 print self.name, ": deleting file '%s' of unused server '%s'" %(localfile['source file'], uuid)
257 self.delete_file(localfile['source file'])
258 except paramiko.ssh_exception.SSHException as e:
259 print self.name, ": Exception deleting file '%s': %s" %(localfile['source file'], str(e))
260 del self.localinfo['server_files'][uuid]
261 self.localinfo_dirty = True
262
263 def insert_task(self, task, *aditional):
264 try:
265 self.queueLock.acquire()
266 task = self.taskQueue.put( (task,) + aditional, timeout=5)
267 self.queueLock.release()
268 return 1, None
269 except Queue.Full:
270 return -1, "timeout inserting a task over host " + self.name
271
272 def run(self):
273 while True:
274 self.load_localinfo()
275 self.load_hostinfo()
276 self.load_servers_from_db()
277 self.delete_unused_files()
278 while True:
279 self.queueLock.acquire()
280 if not self.taskQueue.empty():
281 task = self.taskQueue.get()
282 else:
283 task = None
284 self.queueLock.release()
285
286 if task is None:
287 now=time.time()
288 if self.localinfo_dirty:
289 self.save_localinfo()
290 elif self.next_update_server_status < now:
291 self.update_servers_status()
292 self.next_update_server_status = now + 5
293 elif len(self.pending_terminate_server)>0 and self.pending_terminate_server[0][0]<now:
294 self.server_forceoff()
295 else:
296 time.sleep(1)
297 continue
298
299 if task[0] == 'instance':
300 print self.name, ": processing task instance", task[1]['action']
301 retry=0
302 while retry <2:
303 retry += 1
304 r=self.action_on_server(task[1], retry==2)
305 if r>=0:
306 break
307 elif task[0] == 'image':
308 pass
309 elif task[0] == 'exit':
310 print self.name, ": processing task exit"
311 self.terminate()
312 return 0
313 elif task[0] == 'reload':
314 print self.name, ": processing task reload terminating and relaunching"
315 self.terminate()
316 break
317 elif task[0] == 'edit-iface':
318 print self.name, ": processing task edit-iface port=%s, old_net=%s, new_net=%s" % (task[1], task[2], task[3])
319 self.edit_iface(task[1], task[2], task[3])
320 elif task[0] == 'restore-iface':
321 print self.name, ": processing task restore-iface %s mac=%s" % (task[1], task[2])
322 self.restore_iface(task[1], task[2])
323 else:
324 print self.name, ": unknown task", task
325
326 def server_forceoff(self, wait_until_finished=False):
327 while len(self.pending_terminate_server)>0:
328 now = time.time()
329 if self.pending_terminate_server[0][0]>now:
330 if wait_until_finished:
331 time.sleep(1)
332 continue
333 else:
334 return
335 req={'uuid':self.pending_terminate_server[0][1],
336 'action':{'terminate':'force'},
337 'status': None
338 }
339 self.action_on_server(req)
340 self.pending_terminate_server.pop(0)
341
342 def terminate(self):
343 try:
344 self.server_forceoff(True)
345 if self.localinfo_dirty:
346 self.save_localinfo()
347 if not self.test:
348 self.ssh_conn.close()
349 except Exception as e:
350 text = str(e)
351 print self.name, ": terminate Exception:", text
352 print self.name, ": exit from host_thread"
353
354 def get_local_iface_name(self, generic_name):
355 if self.hostinfo != None and "iface_names" in self.hostinfo and generic_name in self.hostinfo["iface_names"]:
356 return self.hostinfo["iface_names"][generic_name]
357 return generic_name
358
359 def create_xml_server(self, server, dev_list, server_metadata={}):
360 """Function that implements the generation of the VM XML definition.
361 Additional devices are in dev_list list
362 The main disk is upon dev_list[0]"""
363
364 #get if operating system is Windows
365 windows_os = False
366 os_type = server_metadata.get('os_type', None)
367 if os_type == None and 'metadata' in dev_list[0]:
368 os_type = dev_list[0]['metadata'].get('os_type', None)
369 if os_type != None and os_type.lower() == "windows":
370 windows_os = True
371 #get type of hard disk bus
372 bus_ide = True if windows_os else False
373 bus = server_metadata.get('bus', None)
374 if bus == None and 'metadata' in dev_list[0]:
375 bus = dev_list[0]['metadata'].get('bus', None)
376 if bus != None:
377 bus_ide = True if bus=='ide' else False
378
379 self.xml_level = 0
380
381 text = "<domain type='kvm'>"
382 #get topology
383 topo = server_metadata.get('topology', None)
384 if topo == None and 'metadata' in dev_list[0]:
385 topo = dev_list[0]['metadata'].get('topology', None)
386 #name
387 name = server.get('name','') + "_" + server['uuid']
388 name = name[:58] #qemu impose a length limit of 59 chars or not start. Using 58
389 text += self.inc_tab() + "<name>" + name+ "</name>"
390 #uuid
391 text += self.tab() + "<uuid>" + server['uuid'] + "</uuid>"
392
393 numa={}
394 if 'extended' in server and server['extended']!=None and 'numas' in server['extended']:
395 numa = server['extended']['numas'][0]
396 #memory
397 use_huge = False
398 memory = int(numa.get('memory',0))*1024*1024 #in KiB
399 if memory==0:
400 memory = int(server['ram'])*1024;
401 else:
402 if not self.develop_mode:
403 use_huge = True
404 if memory==0:
405 return -1, 'No memory assigned to instance'
406 memory = str(memory)
407 text += self.tab() + "<memory unit='KiB'>" +memory+"</memory>"
408 text += self.tab() + "<currentMemory unit='KiB'>" +memory+ "</currentMemory>"
409 if use_huge:
410 text += self.tab()+'<memoryBacking>'+ \
411 self.inc_tab() + '<hugepages/>'+ \
412 self.dec_tab()+ '</memoryBacking>'
413
414 #cpu
415 use_cpu_pinning=False
416 vcpus = int(server.get("vcpus",0))
417 cpu_pinning = []
418 if 'cores-source' in numa:
419 use_cpu_pinning=True
420 for index in range(0, len(numa['cores-source'])):
421 cpu_pinning.append( [ numa['cores-id'][index], numa['cores-source'][index] ] )
422 vcpus += 1
423 if 'threads-source' in numa:
424 use_cpu_pinning=True
425 for index in range(0, len(numa['threads-source'])):
426 cpu_pinning.append( [ numa['threads-id'][index], numa['threads-source'][index] ] )
427 vcpus += 1
428 if 'paired-threads-source' in numa:
429 use_cpu_pinning=True
430 for index in range(0, len(numa['paired-threads-source'])):
431 cpu_pinning.append( [numa['paired-threads-id'][index][0], numa['paired-threads-source'][index][0] ] )
432 cpu_pinning.append( [numa['paired-threads-id'][index][1], numa['paired-threads-source'][index][1] ] )
433 vcpus += 2
434
435 if use_cpu_pinning and not self.develop_mode:
436 text += self.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning)) +"</vcpu>" + \
437 self.tab()+'<cputune>'
438 self.xml_level += 1
439 for i in range(0, len(cpu_pinning)):
440 text += self.tab() + "<vcpupin vcpu='" +str(cpu_pinning[i][0])+ "' cpuset='" +str(cpu_pinning[i][1]) +"'/>"
441 text += self.dec_tab()+'</cputune>'+ \
442 self.tab() + '<numatune>' +\
443 self.inc_tab() + "<memory mode='strict' nodeset='" +str(numa['source'])+ "'/>" +\
444 self.dec_tab() + '</numatune>'
445 else:
446 if vcpus==0:
447 return -1, "Instance without number of cpus"
448 text += self.tab()+"<vcpu>" + str(vcpus) + "</vcpu>"
449
450 #boot
451 boot_cdrom = False
452 for dev in dev_list:
453 if dev['type']=='cdrom' :
454 boot_cdrom = True
455 break
456 text += self.tab()+ '<os>' + \
457 self.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
458 if boot_cdrom:
459 text += self.tab() + "<boot dev='cdrom'/>"
460 text += self.tab() + "<boot dev='hd'/>" + \
461 self.dec_tab()+'</os>'
462 #features
463 text += self.tab()+'<features>'+\
464 self.inc_tab()+'<acpi/>' +\
465 self.tab()+'<apic/>' +\
466 self.tab()+'<pae/>'+ \
467 self.dec_tab() +'</features>'
468 if windows_os or topo=="oneSocket":
469 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>"% vcpus
470 else:
471 text += self.tab() + "<cpu mode='host-model'></cpu>"
472 text += self.tab() + "<clock offset='utc'/>" +\
473 self.tab() + "<on_poweroff>preserve</on_poweroff>" + \
474 self.tab() + "<on_reboot>restart</on_reboot>" + \
475 self.tab() + "<on_crash>restart</on_crash>"
476 text += self.tab() + "<devices>" + \
477 self.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
478 self.tab() + "<serial type='pty'>" +\
479 self.inc_tab() + "<target port='0'/>" + \
480 self.dec_tab() + "</serial>" +\
481 self.tab() + "<console type='pty'>" + \
482 self.inc_tab()+ "<target type='serial' port='0'/>" + \
483 self.dec_tab()+'</console>'
484 if windows_os:
485 text += self.tab() + "<controller type='usb' index='0'/>" + \
486 self.tab() + "<controller type='ide' index='0'/>" + \
487 self.tab() + "<input type='mouse' bus='ps2'/>" + \
488 self.tab() + "<sound model='ich6'/>" + \
489 self.tab() + "<video>" + \
490 self.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
491 self.dec_tab() + "</video>" + \
492 self.tab() + "<memballoon model='virtio'/>" + \
493 self.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
494
495 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
496 #> self.dec_tab()+'</hostdev>\n' +\
497 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
498 if windows_os:
499 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
500 else:
501 #If image contains 'GRAPH' include graphics
502 #if 'GRAPH' in image:
503 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
504 self.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
505 self.dec_tab() + "</graphics>"
506
507 vd_index = 'a'
508 for dev in dev_list:
509 bus_ide_dev = bus_ide
510 if dev['type']=='cdrom' or dev['type']=='disk':
511 if dev['type']=='cdrom':
512 bus_ide_dev = True
513 text += self.tab() + "<disk type='file' device='"+dev['type']+"'>"
514 if 'file format' in dev:
515 text += self.inc_tab() + "<driver name='qemu' type='" +dev['file format']+ "' cache='writethrough'/>"
516 if 'source file' in dev:
517 text += self.tab() + "<source file='" +dev['source file']+ "'/>"
518 #elif v['type'] == 'block':
519 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
520 #else:
521 # return -1, 'Unknown disk type ' + v['type']
522 vpci = dev.get('vpci',None)
523 if vpci == None:
524 vpci = dev['metadata'].get('vpci',None)
525 text += self.pci2xml(vpci)
526
527 if bus_ide_dev:
528 text += self.tab() + "<target dev='hd" +vd_index+ "' bus='ide'/>" #TODO allows several type of disks
529 else:
530 text += self.tab() + "<target dev='vd" +vd_index+ "' bus='virtio'/>"
531 text += self.dec_tab() + '</disk>'
532 vd_index = chr(ord(vd_index)+1)
533 elif dev['type']=='xml':
534 dev_text = dev['xml']
535 if 'vpci' in dev:
536 dev_text = dev_text.replace('__vpci__', dev['vpci'])
537 if 'source file' in dev:
538 dev_text = dev_text.replace('__file__', dev['source file'])
539 if 'file format' in dev:
540 dev_text = dev_text.replace('__format__', dev['source file'])
541 if '__dev__' in dev_text:
542 dev_text = dev_text.replace('__dev__', vd_index)
543 vd_index = chr(ord(vd_index)+1)
544 text += dev_text
545 else:
546 return -1, 'Unknown device type ' + dev['type']
547
548 net_nb=0
549 bridge_interfaces = server.get('networks', [])
550 for v in bridge_interfaces:
551 #Get the brifge name
552 self.db_lock.acquire()
553 result, content = self.db.get_table(FROM='nets', SELECT=('provider',),WHERE={'uuid':v['net_id']} )
554 self.db_lock.release()
555 if result <= 0:
556 print "create_xml_server ERROR getting nets",result, content
557 return -1, content
558 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
559 #I know it is not secure
560 #for v in sorted(desc['network interfaces'].itervalues()):
561 model = v.get("model", None)
562 if content[0]['provider']=='default':
563 text += self.tab() + "<interface type='network'>" + \
564 self.inc_tab() + "<source network='" +content[0]['provider']+ "'/>"
565 elif content[0]['provider'][0:7]=='macvtap':
566 text += self.tab()+"<interface type='direct'>" + \
567 self.inc_tab() + "<source dev='" + self.get_local_iface_name(content[0]['provider'][8:]) + "' mode='bridge'/>" + \
568 self.tab() + "<target dev='macvtap0'/>"
569 if windows_os:
570 text += self.tab() + "<alias name='net" + str(net_nb) + "'/>"
571 elif model==None:
572 model = "virtio"
573 elif content[0]['provider'][0:6]=='bridge':
574 text += self.tab() + "<interface type='bridge'>" + \
575 self.inc_tab()+"<source bridge='" +self.get_local_iface_name(content[0]['provider'][7:])+ "'/>"
576 if windows_os:
577 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
578 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
579 elif model==None:
580 model = "virtio"
581 else:
582 return -1, 'Unknown Bridge net provider ' + content[0]['provider']
583 if model!=None:
584 text += self.tab() + "<model type='" +model+ "'/>"
585 if v.get('mac_address', None) != None:
586 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
587 text += self.pci2xml(v.get('vpci',None))
588 text += self.dec_tab()+'</interface>'
589
590 net_nb += 1
591
592 interfaces = numa.get('interfaces', [])
593
594 net_nb=0
595 for v in interfaces:
596 if self.develop_mode: #map these interfaces to bridges
597 text += self.tab() + "<interface type='bridge'>" + \
598 self.inc_tab()+"<source bridge='" +self.develop_bridge_iface+ "'/>"
599 if windows_os:
600 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
601 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
602 else:
603 text += self.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
604 if v.get('mac_address', None) != None:
605 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
606 text += self.pci2xml(v.get('vpci',None))
607 text += self.dec_tab()+'</interface>'
608 continue
609
610 if v['dedicated'] == 'yes': #passthrought
611 text += self.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
612 self.inc_tab() + "<source>"
613 self.inc_tab()
614 text += self.pci2xml(v['source'])
615 text += self.dec_tab()+'</source>'
616 text += self.pci2xml(v.get('vpci',None))
617 if windows_os:
618 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
619 text += self.dec_tab()+'</hostdev>'
620 net_nb += 1
621 else: #sriov_interfaces
622 #skip not connected interfaces
623 if v.get("net_id") == None:
624 continue
625 text += self.tab() + "<interface type='hostdev' managed='yes'>"
626 self.inc_tab()
627 if v.get('mac_address', None) != None:
628 text+= self.tab() + "<mac address='" +v['mac_address']+ "'/>"
629 text+= self.tab()+'<source>'
630 self.inc_tab()
631 text += self.pci2xml(v['source'])
632 text += self.dec_tab()+'</source>'
633 if v.get('vlan',None) != None:
634 text += self.tab() + "<vlan> <tag id='" + str(v['vlan']) + "'/> </vlan>"
635 text += self.pci2xml(v.get('vpci',None))
636 if windows_os:
637 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
638 text += self.dec_tab()+'</interface>'
639
640
641 text += self.dec_tab()+'</devices>'+\
642 self.dec_tab()+'</domain>'
643 return 0, text
644
645 def pci2xml(self, pci):
646 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
647 alows an empty pci text'''
648 if pci is None:
649 return ""
650 first_part = pci.split(':')
651 second_part = first_part[2].split('.')
652 return self.tab() + "<address type='pci' domain='0x" + first_part[0] + \
653 "' bus='0x" + first_part[1] + "' slot='0x" + second_part[0] + \
654 "' function='0x" + second_part[1] + "'/>"
655
656 def tab(self):
657 """Return indentation according to xml_level"""
658 return "\n" + (' '*self.xml_level)
659
660 def inc_tab(self):
661 """Increment and return indentation according to xml_level"""
662 self.xml_level += 1
663 return self.tab()
664
665 def dec_tab(self):
666 """Decrement and return indentation according to xml_level"""
667 self.xml_level -= 1
668 return self.tab()
669
670 def get_file_info(self, path):
671 command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
672 print self.name, ': command:', command
673 (_, stdout, _) = self.ssh_conn.exec_command(command)
674 content = stdout.read()
675 if len(content) == 0:
676 return None # file does not exist
677 else:
678 return content.split(" ") #(permission, 1, owner, group, size, date, file)
679
680 def qemu_get_info(self, path):
681 command = 'qemu-img info ' + path
682 print self.name, ': command:', command
683 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
684 content = stdout.read()
685 if len(content) == 0:
686 error = stderr.read()
687 print self.name, ": get_qemu_info error ", error
688 raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error)
689 else:
690 try:
691 return yaml.load(content)
692 except yaml.YAMLError as exc:
693 text = ""
694 if hasattr(exc, 'problem_mark'):
695 mark = exc.problem_mark
696 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
697 print self.name, ": get_qemu_info yaml format Exception", text
698 raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text)
699
700 def qemu_change_backing(self, inc_file, new_backing_file):
701 command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file
702 print self.name, ': command:', command
703 (_, _, stderr) = self.ssh_conn.exec_command(command)
704 content = stderr.read()
705 if len(content) == 0:
706 return 0
707 else:
708 print self.name, ": qemu_change_backing error: ", content
709 return -1
710
711 def get_notused_filename(self, proposed_name, suffix=''):
712 '''Look for a non existing file_name in the host
713 proposed_name: proposed file name, includes path
714 suffix: suffix to be added to the name, before the extention
715 '''
716 extension = proposed_name.rfind(".")
717 slash = proposed_name.rfind("/")
718 if extension < 0 or extension < slash: # no extension
719 extension = len(proposed_name)
720 target_name = proposed_name[:extension] + suffix + proposed_name[extension:]
721 info = self.get_file_info(target_name)
722 if info is None:
723 return target_name
724
725 index=0
726 while info is not None:
727 target_name = proposed_name[:extension] + suffix + "-" + str(index) + proposed_name[extension:]
728 index+=1
729 info = self.get_file_info(target_name)
730 return target_name
731
732 def get_notused_path(self, proposed_path, suffix=''):
733 '''Look for a non existing path at database for images
734 proposed_path: proposed file name, includes path
735 suffix: suffix to be added to the name, before the extention
736 '''
737 extension = proposed_path.rfind(".")
738 if extension < 0:
739 extension = len(proposed_path)
740 if suffix != None:
741 target_path = proposed_path[:extension] + suffix + proposed_path[extension:]
742 index=0
743 while True:
744 r,_=self.db.get_table(FROM="images",WHERE={"path":target_path})
745 if r<=0:
746 return target_path
747 target_path = proposed_path[:extension] + suffix + "-" + str(index) + proposed_path[extension:]
748 index+=1
749
750
751 def delete_file(self, file_name):
752 command = 'rm -f '+file_name
753 print self.name, ': command:', command
754 (_, _, stderr) = self.ssh_conn.exec_command(command)
755 error_msg = stderr.read()
756 if len(error_msg) > 0:
757 raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg)
758
759 def copy_file(self, source, destination, perserve_time=True):
760 command = 'cp --no-preserve=mode '
761 if perserve_time: command += '--preserve=timestamps '
762 command += source + ' ' + destination
763 print self.name, ': command:', command
764 (_, _, stderr) = self.ssh_conn.exec_command(command)
765 error_msg = stderr.read()
766 if len(error_msg) > 0:
767 raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg)
768
769 def copy_remote_file(self, remote_file, use_incremental):
770 ''' Copy a file from the repository to local folder and recursively
771 copy the backing files in case the remote file is incremental
772 Read and/or modified self.localinfo['files'] that contain the
773 unmodified copies of images in the local path
774 params:
775 remote_file: path of remote file
776 use_incremental: None (leave the decision to this function), True, False
777 return:
778 local_file: name of local file
779 qemu_info: dict with quemu information of local file
780 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
781 '''
782
783 use_incremental_out = use_incremental
784 new_backing_file = None
785 local_file = None
786
787 #in case incremental use is not decided, take the decision depending on the image
788 #avoid the use of incremental if this image is already incremental
789 qemu_remote_info = self.qemu_get_info(remote_file)
790 if use_incremental_out==None:
791 use_incremental_out = not 'backing file' in qemu_remote_info
792 #copy recursivelly the backing files
793 if 'backing file' in qemu_remote_info:
794 new_backing_file, _, _ = self.copy_remote_file(qemu_remote_info['backing file'], True)
795
796 #check if remote file is present locally
797 if use_incremental_out and remote_file in self.localinfo['files']:
798 local_file = self.localinfo['files'][remote_file]
799 local_file_info = self.get_file_info(local_file)
800 remote_file_info = self.get_file_info(remote_file)
801 if local_file_info == None:
802 local_file = None
803 elif local_file_info[4]!=remote_file_info[4] or local_file_info[5]!=remote_file_info[5]:
804 #local copy of file not valid because date or size are different.
805 #TODO DELETE local file if this file is not used by any active virtual machine
806 try:
807 self.delete_file(local_file)
808 del self.localinfo['files'][remote_file]
809 except Exception:
810 pass
811 local_file = None
812 else: #check that the local file has the same backing file, or there are not backing at all
813 qemu_info = self.qemu_get_info(local_file)
814 if new_backing_file != qemu_info.get('backing file'):
815 local_file = None
816
817
818 if local_file == None: #copy the file
819 img_name= remote_file.split('/') [-1]
820 img_local = self.image_path + '/' + img_name
821 local_file = self.get_notused_filename(img_local)
822 self.copy_file(remote_file, local_file, use_incremental_out)
823
824 if use_incremental_out:
825 self.localinfo['files'][remote_file] = local_file
826 if new_backing_file:
827 self.qemu_change_backing(local_file, new_backing_file)
828 qemu_info = self.qemu_get_info(local_file)
829
830 return local_file, qemu_info, use_incremental_out
831
832 def launch_server(self, conn, server, rebuild=False, domain=None):
833 if self.test:
834 time.sleep(random.randint(20,150)) #sleep random timeto be make it a bit more real
835 return 0, 'Success'
836
837 server_id = server['uuid']
838 paused = server.get('paused','no')
839 try:
840 if domain!=None and rebuild==False:
841 domain.resume()
842 #self.server_status[server_id] = 'ACTIVE'
843 return 0, 'Success'
844
845 self.db_lock.acquire()
846 result, server_data = self.db.get_instance(server_id)
847 self.db_lock.release()
848 if result <= 0:
849 print self.name, ": launch_server ERROR getting server from DB",result, server_data
850 return result, server_data
851
852 #0: get image metadata
853 server_metadata = server.get('metadata', {})
854 use_incremental = None
855
856 if "use_incremental" in server_metadata:
857 use_incremental = False if server_metadata["use_incremental"]=="no" else True
858
859 server_host_files = self.localinfo['server_files'].get( server['uuid'], {})
860 if rebuild:
861 #delete previous incremental files
862 for file_ in server_host_files.values():
863 self.delete_file(file_['source file'] )
864 server_host_files={}
865
866 #1: obtain aditional devices (disks)
867 #Put as first device the main disk
868 devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ]
869 if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']:
870 devices += server_data['extended']['devices']
871
872 for dev in devices:
873 if dev['image_id'] == None:
874 continue
875
876 self.db_lock.acquire()
877 result, content = self.db.get_table(FROM='images', SELECT=('path','metadata'),WHERE={'uuid':dev['image_id']} )
878 self.db_lock.release()
879 if result <= 0:
880 error_text = "ERROR", result, content, "when getting image", dev['image_id']
881 print self.name, ": launch_server", error_text
882 return -1, error_text
883 if content[0]['metadata'] is not None:
884 dev['metadata'] = json.loads(content[0]['metadata'])
885 else:
886 dev['metadata'] = {}
887
888 if dev['image_id'] in server_host_files:
889 dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path
890 dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2
891 continue
892
893 #2: copy image to host
894 remote_file = content[0]['path']
895 use_incremental_image = use_incremental
896 if dev['metadata'].get("use_incremental") == "no":
897 use_incremental_image = False
898 local_file, qemu_info, use_incremental_image = self.copy_remote_file(remote_file, use_incremental_image)
899
900 #create incremental image
901 if use_incremental_image:
902 local_file_inc = self.get_notused_filename(local_file, '.inc')
903 command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file
904 print 'command:', command
905 (_, _, stderr) = self.ssh_conn.exec_command(command)
906 error_msg = stderr.read()
907 if len(error_msg) > 0:
908 raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg)
909 local_file = local_file_inc
910 qemu_info = {'file format':'qcow2'}
911
912 server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']}
913
914 dev['source file'] = local_file
915 dev['file format'] = qemu_info['file format']
916
917 self.localinfo['server_files'][ server['uuid'] ] = server_host_files
918 self.localinfo_dirty = True
919
920 #3 Create XML
921 result, xml = self.create_xml_server(server_data, devices, server_metadata) #local_file
922 if result <0:
923 print self.name, ": create xml server error:", xml
924 return -2, xml
925 print self.name, ": create xml:", xml
926 atribute = libvirt.VIR_DOMAIN_START_PAUSED if paused == "yes" else 0
927 #4 Start the domain
928 if not rebuild: #ensures that any pending destroying server is done
929 self.server_forceoff(True)
930 #print self.name, ": launching instance" #, xml
931 conn.createXML(xml, atribute)
932 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
933
934 return 0, 'Success'
935
936 except paramiko.ssh_exception.SSHException as e:
937 text = e.args[0]
938 print self.name, ": launch_server(%s) ssh Exception: %s" %(server_id, text)
939 if "SSH session not active" in text:
940 self.ssh_connect()
941 except libvirt.libvirtError as e:
942 text = e.get_error_message()
943 print self.name, ": launch_server(%s) libvirt Exception: %s" %(server_id, text)
944 except Exception as e:
945 text = str(e)
946 print self.name, ": launch_server(%s) Exception: %s" %(server_id, text)
947 return -1, text
948
949 def update_servers_status(self):
950 # # virDomainState
951 # VIR_DOMAIN_NOSTATE = 0
952 # VIR_DOMAIN_RUNNING = 1
953 # VIR_DOMAIN_BLOCKED = 2
954 # VIR_DOMAIN_PAUSED = 3
955 # VIR_DOMAIN_SHUTDOWN = 4
956 # VIR_DOMAIN_SHUTOFF = 5
957 # VIR_DOMAIN_CRASHED = 6
958 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
959
960 if self.test or len(self.server_status)==0:
961 return
962
963 try:
964 conn = libvirt.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
965 domains= conn.listAllDomains()
966 domain_dict={}
967 for domain in domains:
968 uuid = domain.UUIDString() ;
969 libvirt_status = domain.state()
970 #print libvirt_status
971 if libvirt_status[0] == libvirt.VIR_DOMAIN_RUNNING or libvirt_status[0] == libvirt.VIR_DOMAIN_SHUTDOWN:
972 new_status = "ACTIVE"
973 elif libvirt_status[0] == libvirt.VIR_DOMAIN_PAUSED:
974 new_status = "PAUSED"
975 elif libvirt_status[0] == libvirt.VIR_DOMAIN_SHUTOFF:
976 new_status = "INACTIVE"
977 elif libvirt_status[0] == libvirt.VIR_DOMAIN_CRASHED:
978 new_status = "ERROR"
979 else:
980 new_status = None
981 domain_dict[uuid] = new_status
982 conn.close
983 except libvirt.libvirtError as e:
984 print self.name, ": get_state() Exception '", e.get_error_message()
985 return
986
987 for server_id, current_status in self.server_status.iteritems():
988 new_status = None
989 if server_id in domain_dict:
990 new_status = domain_dict[server_id]
991 else:
992 new_status = "INACTIVE"
993
994 if new_status == None or new_status == current_status:
995 continue
996 if new_status == 'INACTIVE' and current_status == 'ERROR':
997 continue #keep ERROR status, because obviously this machine is not running
998 #change status
999 print self.name, ": server ", server_id, "status change from ", current_status, "to", new_status
1000 STATUS={'progress':100, 'status':new_status}
1001 if new_status == 'ERROR':
1002 STATUS['last_error'] = 'machine has crashed'
1003 self.db_lock.acquire()
1004 r,_ = self.db.update_rows('instances', STATUS, {'uuid':server_id}, log=False)
1005 self.db_lock.release()
1006 if r>=0:
1007 self.server_status[server_id] = new_status
1008
1009 def action_on_server(self, req, last_retry=True):
1010 '''Perform an action on a req
1011 Attributes:
1012 req: dictionary that contain:
1013 server properties: 'uuid','name','tenant_id','status'
1014 action: 'action'
1015 host properties: 'user', 'ip_name'
1016 return (error, text)
1017 0: No error. VM is updated to new state,
1018 -1: Invalid action, as trying to pause a PAUSED VM
1019 -2: Error accessing host
1020 -3: VM nor present
1021 -4: Error at DB access
1022 -5: Error while trying to perform action. VM is updated to ERROR
1023 '''
1024 server_id = req['uuid']
1025 conn = None
1026 new_status = None
1027 old_status = req['status']
1028 last_error = None
1029
1030 if self.test:
1031 if 'terminate' in req['action']:
1032 new_status = 'deleted'
1033 elif 'shutoff' in req['action'] or 'shutdown' in req['action'] or 'forceOff' in req['action']:
1034 if req['status']!='ERROR':
1035 time.sleep(5)
1036 new_status = 'INACTIVE'
1037 elif 'start' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1038 elif 'resume' in req['action'] and req['status']!='ERROR' and req['status']!='INACTIVE' : new_status = 'ACTIVE'
1039 elif 'pause' in req['action'] and req['status']!='ERROR': new_status = 'PAUSED'
1040 elif 'reboot' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1041 elif 'rebuild' in req['action']:
1042 time.sleep(random.randint(20,150))
1043 new_status = 'ACTIVE'
1044 elif 'createImage' in req['action']:
1045 time.sleep(5)
1046 self.create_image(None, req)
1047 else:
1048 try:
1049 conn = libvirt.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1050 try:
1051 dom = conn.lookupByUUIDString(server_id)
1052 except libvirt.libvirtError as e:
1053 text = e.get_error_message()
1054 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1055 dom = None
1056 else:
1057 print self.name, ": action_on_server(",server_id,") libvirt exception:", text
1058 raise e
1059
1060 if 'forceOff' in req['action']:
1061 if dom == None:
1062 print self.name, ": action_on_server(",server_id,") domain not running"
1063 else:
1064 try:
1065 print self.name, ": sending DESTROY to server", server_id
1066 dom.destroy()
1067 except Exception as e:
1068 if "domain is not running" not in e.get_error_message():
1069 print self.name, ": action_on_server(",server_id,") Exception while sending force off:", e.get_error_message()
1070 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1071 new_status = 'ERROR'
1072
1073 elif 'terminate' in req['action']:
1074 if dom == None:
1075 print self.name, ": action_on_server(",server_id,") domain not running"
1076 new_status = 'deleted'
1077 else:
1078 try:
1079 if req['action']['terminate'] == 'force':
1080 print self.name, ": sending DESTROY to server", server_id
1081 dom.destroy()
1082 new_status = 'deleted'
1083 else:
1084 print self.name, ": sending SHUTDOWN to server", server_id
1085 dom.shutdown()
1086 self.pending_terminate_server.append( (time.time()+10,server_id) )
1087 except Exception as e:
1088 print self.name, ": action_on_server(",server_id,") Exception while destroy:", e.get_error_message()
1089 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1090 new_status = 'ERROR'
1091 if "domain is not running" in e.get_error_message():
1092 try:
1093 dom.undefine()
1094 new_status = 'deleted'
1095 except Exception:
1096 print self.name, ": action_on_server(",server_id,") Exception while undefine:", e.get_error_message()
1097 last_error = 'action_on_server Exception2 while undefine:', e.get_error_message()
1098 #Exception: 'virDomainDetachDevice() failed'
1099 if new_status=='deleted':
1100 if server_id in self.server_status:
1101 del self.server_status[server_id]
1102 if req['uuid'] in self.localinfo['server_files']:
1103 for file_ in self.localinfo['server_files'][ req['uuid'] ].values():
1104 try:
1105 self.delete_file(file_['source file'])
1106 except Exception:
1107 pass
1108 del self.localinfo['server_files'][ req['uuid'] ]
1109 self.localinfo_dirty = True
1110
1111 elif 'shutoff' in req['action'] or 'shutdown' in req['action']:
1112 try:
1113 if dom == None:
1114 print self.name, ": action_on_server(",server_id,") domain not running"
1115 else:
1116 dom.shutdown()
1117 # new_status = 'INACTIVE'
1118 #TODO: check status for changing at database
1119 except Exception as e:
1120 new_status = 'ERROR'
1121 print self.name, ": action_on_server(",server_id,") Exception while shutdown:", e.get_error_message()
1122 last_error = 'action_on_server Exception while shutdown: ' + e.get_error_message()
1123
1124 elif 'rebuild' in req['action']:
1125 if dom != None:
1126 dom.destroy()
1127 r = self.launch_server(conn, req, True, None)
1128 if r[0] <0:
1129 new_status = 'ERROR'
1130 last_error = r[1]
1131 else:
1132 new_status = 'ACTIVE'
1133 elif 'start' in req['action']:
1134 #La instancia está sólo en la base de datos pero no en la libvirt. es necesario crearla
1135 rebuild = True if req['action']['start']=='rebuild' else False
1136 r = self.launch_server(conn, req, rebuild, dom)
1137 if r[0] <0:
1138 new_status = 'ERROR'
1139 last_error = r[1]
1140 else:
1141 new_status = 'ACTIVE'
1142
1143 elif 'resume' in req['action']:
1144 try:
1145 if dom == None:
1146 pass
1147 else:
1148 dom.resume()
1149 # new_status = 'ACTIVE'
1150 except Exception as e:
1151 print self.name, ": action_on_server(",server_id,") Exception while resume:", e.get_error_message()
1152
1153 elif 'pause' in req['action']:
1154 try:
1155 if dom == None:
1156 pass
1157 else:
1158 dom.suspend()
1159 # new_status = 'PAUSED'
1160 except Exception as e:
1161 print self.name, ": action_on_server(",server_id,") Exception while pause:", e.get_error_message()
1162
1163 elif 'reboot' in req['action']:
1164 try:
1165 if dom == None:
1166 pass
1167 else:
1168 dom.reboot()
1169 print self.name, ": action_on_server(",server_id,") reboot:"
1170 #new_status = 'ACTIVE'
1171 except Exception as e:
1172 print self.name, ": action_on_server(",server_id,") Exception while reboot:", e.get_error_message()
1173 elif 'createImage' in req['action']:
1174 self.create_image(dom, req)
1175
1176
1177 conn.close()
1178 except libvirt.libvirtError as e:
1179 if conn is not None: conn.close
1180 text = e.get_error_message()
1181 new_status = "ERROR"
1182 last_error = text
1183 print self.name, ": action_on_server(",server_id,") Exception '", text
1184 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1185 print self.name, ": action_on_server(",server_id,") Exception removed from host"
1186 #end of if self.test
1187 if new_status == None:
1188 return 1
1189
1190 print self.name, ": action_on_server(",server_id,") new status", new_status, last_error
1191 UPDATE = {'progress':100, 'status':new_status}
1192
1193 if new_status=='ERROR':
1194 if not last_retry: #if there will be another retry do not update database
1195 return -1
1196 elif 'terminate' in req['action']:
1197 #PUT a log in the database
1198 print self.name, ": PANIC deleting server", server_id, last_error
1199 self.db_lock.acquire()
1200 self.db.new_row('logs',
1201 {'uuid':server_id, 'tenant_id':req['tenant_id'], 'related':'instances','level':'panic',
1202 'description':'PANIC deleting server from host '+self.name+': '+last_error}
1203 )
1204 self.db_lock.release()
1205 if server_id in self.server_status:
1206 del self.server_status[server_id]
1207 return -1
1208 else:
1209 UPDATE['last_error'] = last_error
1210 if new_status != 'deleted' and (new_status != old_status or new_status == 'ERROR') :
1211 self.db_lock.acquire()
1212 self.db.update_rows('instances', UPDATE, {'uuid':server_id}, log=True)
1213 self.server_status[server_id] = new_status
1214 self.db_lock.release()
1215 if new_status == 'ERROR':
1216 return -1
1217 return 1
1218
1219
1220 def restore_iface(self, name, mac, lib_conn=None):
1221 ''' make an ifdown, ifup to restore default parameter of na interface
1222 Params:
1223 mac: mac address of the interface
1224 lib_conn: connection to the libvirt, if None a new connection is created
1225 Return 0,None if ok, -1,text if fails
1226 '''
1227 conn=None
1228 ret = 0
1229 error_text=None
1230 if self.test:
1231 print self.name, ": restore_iface '%s' %s" % (name, mac)
1232 return 0, None
1233 try:
1234 if not lib_conn:
1235 conn = libvirt.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1236 else:
1237 conn = lib_conn
1238
1239 #wait to the pending VM deletion
1240 #TODO.Revise self.server_forceoff(True)
1241
1242 iface = conn.interfaceLookupByMACString(mac)
1243 iface.destroy()
1244 iface.create()
1245 print self.name, ": restore_iface '%s' %s" % (name, mac)
1246 except libvirt.libvirtError as e:
1247 error_text = e.get_error_message()
1248 print self.name, ": restore_iface '%s' '%s' libvirt exception: %s" %(name, mac, error_text)
1249 ret=-1
1250 finally:
1251 if lib_conn is None and conn is not None:
1252 conn.close
1253 return ret, error_text
1254
1255
1256 def create_image(self,dom, req):
1257 if self.test:
1258 if 'path' in req['action']['createImage']:
1259 file_dst = req['action']['createImage']['path']
1260 else:
1261 createImage=req['action']['createImage']
1262 img_name= createImage['source']['path']
1263 index=img_name.rfind('/')
1264 file_dst = self.get_notused_path(img_name[:index+1] + createImage['name'] + '.qcow2')
1265 image_status='ACTIVE'
1266 else:
1267 for retry in (0,1):
1268 try:
1269 server_id = req['uuid']
1270 createImage=req['action']['createImage']
1271 file_orig = self.localinfo['server_files'][server_id] [ createImage['source']['image_id'] ] ['source file']
1272 if 'path' in req['action']['createImage']:
1273 file_dst = req['action']['createImage']['path']
1274 else:
1275 img_name= createImage['source']['path']
1276 index=img_name.rfind('/')
1277 file_dst = self.get_notused_filename(img_name[:index+1] + createImage['name'] + '.qcow2')
1278
1279 self.copy_file(file_orig, file_dst)
1280 qemu_info = self.qemu_get_info(file_orig)
1281 if 'backing file' in qemu_info:
1282 for k,v in self.localinfo['files'].items():
1283 if v==qemu_info['backing file']:
1284 self.qemu_change_backing(file_dst, k)
1285 break
1286 image_status='ACTIVE'
1287 break
1288 except paramiko.ssh_exception.SSHException as e:
1289 image_status='ERROR'
1290 error_text = e.args[0]
1291 print self.name, "': create_image(",server_id,") ssh Exception:", error_text
1292 if "SSH session not active" in error_text and retry==0:
1293 self.ssh_connect()
1294 except Exception as e:
1295 image_status='ERROR'
1296 error_text = str(e)
1297 print self.name, "': create_image(",server_id,") Exception:", error_text
1298
1299 #TODO insert a last_error at database
1300 self.db_lock.acquire()
1301 self.db.update_rows('images', {'status':image_status, 'progress': 100, 'path':file_dst},
1302 {'uuid':req['new_image']['uuid']}, log=True)
1303 self.db_lock.release()
1304
1305 def edit_iface(self, port_id, old_net, new_net):
1306 #This action imply remove and insert interface to put proper parameters
1307 if self.test:
1308 time.sleep(1)
1309 else:
1310 #get iface details
1311 self.db_lock.acquire()
1312 r,c = self.db.get_table(FROM='ports as p join resources_port as rp on p.uuid=rp.port_id',
1313 WHERE={'port_id': port_id})
1314 self.db_lock.release()
1315 if r<0:
1316 print self.name, ": edit_iface(",port_id,") DDBB error:", c
1317 return
1318 elif r==0:
1319 print self.name, ": edit_iface(",port_id,") por not found"
1320 return
1321 port=c[0]
1322 if port["model"]!="VF":
1323 print self.name, ": edit_iface(",port_id,") ERROR model must be VF"
1324 return
1325 #create xml detach file
1326 xml=[]
1327 self.xml_level = 2
1328 xml.append("<interface type='hostdev' managed='yes'>")
1329 xml.append(" <mac address='" +port['mac']+ "'/>")
1330 xml.append(" <source>"+ self.pci2xml(port['pci'])+"\n </source>")
1331 xml.append('</interface>')
1332
1333
1334 try:
1335 conn=None
1336 conn = libvirt.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1337 dom = conn.lookupByUUIDString(port["instance_id"])
1338 if old_net:
1339 text="\n".join(xml)
1340 print self.name, ": edit_iface detaching SRIOV interface", text
1341 dom.detachDeviceFlags(text, flags=libvirt.VIR_DOMAIN_AFFECT_LIVE)
1342 if new_net:
1343 xml[-1] =" <vlan> <tag id='" + str(port['vlan']) + "'/> </vlan>"
1344 self.xml_level = 1
1345 xml.append(self.pci2xml(port.get('vpci',None)) )
1346 xml.append('</interface>')
1347 text="\n".join(xml)
1348 print self.name, ": edit_iface attaching SRIOV interface", text
1349 dom.attachDeviceFlags(text, flags=libvirt.VIR_DOMAIN_AFFECT_LIVE)
1350
1351 except libvirt.libvirtError as e:
1352 text = e.get_error_message()
1353 print self.name, ": edit_iface(",port["instance_id"],") libvirt exception:", text
1354
1355 finally:
1356 if conn is not None: conn.close
1357
1358
1359 def create_server(server, db, db_lock, only_of_ports):
1360 #print "server"
1361 #print "server"
1362 #print server
1363 #print "server"
1364 #print "server"
1365 #try:
1366 # host_id = server.get('host_id', None)
1367 extended = server.get('extended', None)
1368
1369 # print '----------------------'
1370 # print json.dumps(extended, indent=4)
1371
1372 requirements={}
1373 requirements['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
1374 requirements['ram'] = server['flavor'].get('ram', 0)
1375 if requirements['ram']== None:
1376 requirements['ram'] = 0
1377 requirements['vcpus'] = server['flavor'].get('vcpus', 0)
1378 if requirements['vcpus']== None:
1379 requirements['vcpus'] = 0
1380 #If extended is not defined get requirements from flavor
1381 if extended is None:
1382 #If extended is defined in flavor convert to dictionary and use it
1383 if 'extended' in server['flavor'] and server['flavor']['extended'] != None:
1384 json_acceptable_string = server['flavor']['extended'].replace("'", "\"")
1385 extended = json.loads(json_acceptable_string)
1386 else:
1387 extended = None
1388 #print json.dumps(extended, indent=4)
1389
1390 #For simplicity only one numa VM are supported in the initial implementation
1391 if extended != None:
1392 numas = extended.get('numas', [])
1393 if len(numas)>1:
1394 return (-2, "Multi-NUMA VMs are not supported yet")
1395 #elif len(numas)<1:
1396 # return (-1, "At least one numa must be specified")
1397
1398 #a for loop is used in order to be ready to multi-NUMA VMs
1399 request = []
1400 for numa in numas:
1401 numa_req = {}
1402 numa_req['memory'] = numa.get('memory', 0)
1403 if 'cores' in numa:
1404 numa_req['proc_req_nb'] = numa['cores'] #number of cores or threads to be reserved
1405 numa_req['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
1406 numa_req['proc_req_list'] = numa.get('cores-id', None) #list of ids to be assigned to the cores or threads
1407 elif 'paired-threads' in numa:
1408 numa_req['proc_req_nb'] = numa['paired-threads']
1409 numa_req['proc_req_type'] = 'paired-threads'
1410 numa_req['proc_req_list'] = numa.get('paired-threads-id', None)
1411 elif 'threads' in numa:
1412 numa_req['proc_req_nb'] = numa['threads']
1413 numa_req['proc_req_type'] = 'threads'
1414 numa_req['proc_req_list'] = numa.get('threads-id', None)
1415 else:
1416 numa_req['proc_req_nb'] = 0 # by default
1417 numa_req['proc_req_type'] = 'threads'
1418
1419
1420
1421 #Generate a list of sriov and another for physical interfaces
1422 interfaces = numa.get('interfaces', [])
1423 sriov_list = []
1424 port_list = []
1425 for iface in interfaces:
1426 iface['bandwidth'] = int(iface['bandwidth'])
1427 if iface['dedicated'][:3]=='yes':
1428 port_list.append(iface)
1429 else:
1430 sriov_list.append(iface)
1431
1432 #Save lists ordered from more restrictive to less bw requirements
1433 numa_req['sriov_list'] = sorted(sriov_list, key=lambda k: k['bandwidth'], reverse=True)
1434 numa_req['port_list'] = sorted(port_list, key=lambda k: k['bandwidth'], reverse=True)
1435
1436
1437 request.append(numa_req)
1438
1439 # print "----------\n"+json.dumps(request[0], indent=4)
1440 # print '----------\n\n'
1441
1442 #Search in db for an appropriate numa for each requested numa
1443 #at the moment multi-NUMA VMs are not supported
1444 if len(request)>0:
1445 requirements['numa'].update(request[0])
1446 if requirements['numa']['memory']>0:
1447 requirements['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
1448 elif requirements['ram']==0:
1449 return (-1, "Memory information not set neither at extended field not at ram")
1450 if requirements['numa']['proc_req_nb']>0:
1451 requirements['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
1452 elif requirements['vcpus']==0:
1453 return (-1, "Processor information not set neither at extended field not at vcpus")
1454
1455
1456 db_lock.acquire()
1457 result, content = db.get_numas(requirements, server.get('host_id', None), only_of_ports)
1458 db_lock.release()
1459
1460 if result == -1:
1461 return (-1, content)
1462
1463 numa_id = content['numa_id']
1464 host_id = content['host_id']
1465
1466 #obtain threads_id and calculate pinning
1467 cpu_pinning = []
1468 reserved_threads=[]
1469 if requirements['numa']['proc_req_nb']>0:
1470 db_lock.acquire()
1471 result, content = db.get_table(FROM='resources_core',
1472 SELECT=('id','core_id','thread_id'),
1473 WHERE={'numa_id':numa_id,'instance_id': None, 'status':'ok'} )
1474 db_lock.release()
1475 if result <= 0:
1476 print content
1477 return -1, content
1478
1479 #convert rows to a dictionary indexed by core_id
1480 cores_dict = {}
1481 for row in content:
1482 if not row['core_id'] in cores_dict:
1483 cores_dict[row['core_id']] = []
1484 cores_dict[row['core_id']].append([row['thread_id'],row['id']])
1485
1486 #In case full cores are requested
1487 paired = 'N'
1488 if requirements['numa']['proc_req_type'] == 'cores':
1489 #Get/create the list of the vcpu_ids
1490 vcpu_id_list = requirements['numa']['proc_req_list']
1491 if vcpu_id_list == None:
1492 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
1493
1494 for threads in cores_dict.itervalues():
1495 #we need full cores
1496 if len(threads) != 2:
1497 continue
1498
1499 #set pinning for the first thread
1500 cpu_pinning.append( [ vcpu_id_list.pop(0), threads[0][0], threads[0][1] ] )
1501
1502 #reserve so it is not used the second thread
1503 reserved_threads.append(threads[1][1])
1504
1505 if len(vcpu_id_list) == 0:
1506 break
1507
1508 #In case paired threads are requested
1509 elif requirements['numa']['proc_req_type'] == 'paired-threads':
1510 paired = 'Y'
1511 #Get/create the list of the vcpu_ids
1512 if requirements['numa']['proc_req_list'] != None:
1513 vcpu_id_list = []
1514 for pair in requirements['numa']['proc_req_list']:
1515 if len(pair)!=2:
1516 return -1, "Field paired-threads-id not properly specified"
1517 return
1518 vcpu_id_list.append(pair[0])
1519 vcpu_id_list.append(pair[1])
1520 else:
1521 vcpu_id_list = range(0,2*int(requirements['numa']['proc_req_nb']))
1522
1523 for threads in cores_dict.itervalues():
1524 #we need full cores
1525 if len(threads) != 2:
1526 continue
1527 #set pinning for the first thread
1528 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
1529
1530 #set pinning for the second thread
1531 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
1532
1533 if len(vcpu_id_list) == 0:
1534 break
1535
1536 #In case normal threads are requested
1537 elif requirements['numa']['proc_req_type'] == 'threads':
1538 #Get/create the list of the vcpu_ids
1539 vcpu_id_list = requirements['numa']['proc_req_list']
1540 if vcpu_id_list == None:
1541 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
1542
1543 for threads_index in sorted(cores_dict, key=lambda k: len(cores_dict[k])):
1544 threads = cores_dict[threads_index]
1545 #set pinning for the first thread
1546 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
1547
1548 #if exists, set pinning for the second thread
1549 if len(threads) == 2 and len(vcpu_id_list) != 0:
1550 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
1551
1552 if len(vcpu_id_list) == 0:
1553 break
1554
1555 #Get the source pci addresses for the selected numa
1556 used_sriov_ports = []
1557 for port in requirements['numa']['sriov_list']:
1558 db_lock.acquire()
1559 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
1560 db_lock.release()
1561 if result <= 0:
1562 print content
1563 return -1, content
1564 for row in content:
1565 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
1566 continue
1567 port['pci'] = row['pci']
1568 if 'mac_address' not in port:
1569 port['mac_address'] = row['mac']
1570 del port['mac']
1571 port['port_id']=row['id']
1572 port['Mbps_used'] = port['bandwidth']
1573 used_sriov_ports.append(row['id'])
1574 break
1575
1576 for port in requirements['numa']['port_list']:
1577 port['Mbps_used'] = None
1578 if port['dedicated'] != "yes:sriov":
1579 port['mac_address'] = port['mac']
1580 del port['mac']
1581 continue
1582 db_lock.acquire()
1583 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac', 'Mbps'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
1584 db_lock.release()
1585 if result <= 0:
1586 print content
1587 return -1, content
1588 port['Mbps_used'] = content[0]['Mbps']
1589 for row in content:
1590 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
1591 continue
1592 port['pci'] = row['pci']
1593 if 'mac_address' not in port:
1594 port['mac_address'] = row['mac'] # mac cannot be set to passthrough ports
1595 del port['mac']
1596 port['port_id']=row['id']
1597 used_sriov_ports.append(row['id'])
1598 break
1599
1600 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
1601 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
1602
1603 server['host_id'] = host_id
1604
1605
1606 #Generate dictionary for saving in db the instance resources
1607 resources = {}
1608 resources['bridged-ifaces'] = []
1609
1610 numa_dict = {}
1611 numa_dict['interfaces'] = []
1612
1613 numa_dict['interfaces'] += requirements['numa']['port_list']
1614 numa_dict['interfaces'] += requirements['numa']['sriov_list']
1615
1616 #Check bridge information
1617 unified_dataplane_iface=[]
1618 unified_dataplane_iface += requirements['numa']['port_list']
1619 unified_dataplane_iface += requirements['numa']['sriov_list']
1620
1621 for control_iface in server.get('networks', []):
1622 control_iface['net_id']=control_iface.pop('uuid')
1623 #Get the brifge name
1624 db_lock.acquire()
1625 result, content = db.get_table(FROM='nets', SELECT=('name','type', 'vlan'),WHERE={'uuid':control_iface['net_id']} )
1626 db_lock.release()
1627 if result < 0:
1628 pass
1629 elif result==0:
1630 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface['net_id']
1631 else:
1632 network=content[0]
1633 if control_iface.get("type", 'virtual') == 'virtual':
1634 if network['type']!='bridge_data' and network['type']!='bridge_man':
1635 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface['net_id']
1636 resources['bridged-ifaces'].append(control_iface)
1637 else:
1638 if network['type']!='data' and network['type']!='ptp':
1639 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface['net_id']
1640 #dataplane interface, look for it in the numa tree and asign this network
1641 iface_found=False
1642 for dataplane_iface in numa_dict['interfaces']:
1643 if dataplane_iface['name'] == control_iface.get("name"):
1644 if (dataplane_iface['dedicated'] == "yes" and control_iface["type"] != "PF") or \
1645 (dataplane_iface['dedicated'] == "no" and control_iface["type"] != "VF") or \
1646 (dataplane_iface['dedicated'] == "yes:sriov" and control_iface["type"] != "VFnotShared") :
1647 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
1648 (control_iface.get("name"), dataplane_iface['dedicated'], control_iface["type"])
1649 dataplane_iface['uuid'] = control_iface['net_id']
1650 if dataplane_iface['dedicated'] == "no":
1651 dataplane_iface['vlan'] = network['vlan']
1652 if dataplane_iface['dedicated'] != "yes" and control_iface.get("mac_address"):
1653 dataplane_iface['mac_address'] = control_iface.get("mac_address")
1654 if control_iface.get("vpci"):
1655 dataplane_iface['vpci'] = control_iface.get("vpci")
1656 iface_found=True
1657 break
1658 if not iface_found:
1659 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface.get("name")
1660
1661 resources['host_id'] = host_id
1662 resources['image_id'] = server['image_id']
1663 resources['flavor_id'] = server['flavor_id']
1664 resources['tenant_id'] = server['tenant_id']
1665 resources['ram'] = requirements['ram']
1666 resources['vcpus'] = requirements['vcpus']
1667 resources['status'] = 'CREATING'
1668
1669 if 'description' in server: resources['description'] = server['description']
1670 if 'name' in server: resources['name'] = server['name']
1671
1672 resources['extended'] = {} #optional
1673 resources['extended']['numas'] = []
1674 numa_dict['numa_id'] = numa_id
1675 numa_dict['memory'] = requirements['numa']['memory']
1676 numa_dict['cores'] = []
1677
1678 for core in cpu_pinning:
1679 numa_dict['cores'].append({'id': core[2], 'vthread': core[0], 'paired': paired})
1680 for core in reserved_threads:
1681 numa_dict['cores'].append({'id': core})
1682 resources['extended']['numas'].append(numa_dict)
1683 if extended!=None and 'devices' in extended: #TODO allow extra devices without numa
1684 resources['extended']['devices'] = extended['devices']
1685
1686
1687 print '===================================={'
1688 print json.dumps(resources, indent=4)
1689 print '====================================}'
1690
1691 return 0, resources
1692