allow openvim in test mode to run without python-libvirt package installed
[osm/openvim.git] / host_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 '''
28 __author__="Pablo Montes, Alfonso Tierno"
29 __date__ ="$10-jul-2014 12:07:15$"
30
31
32 import json
33 import yaml
34 import threading
35 import time
36 import Queue
37 import paramiko
38 from jsonschema import validate as js_v, exceptions as js_e
39 #import libvirt
40 import imp
41 from vim_schema import localinfo_schema, hostinfo_schema
42 import random
43 #from logging import Logger
44 #import auxiliary_functions as af
45
46 #TODO: insert a logging system
47
48 global lvirt_module
49 lvirt_module=None #libvirt module is charged only if not in test mode
50
51 class host_thread(threading.Thread):
52 def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode, develop_bridge_iface):
53 '''Init a thread.
54 Arguments:
55 'id' number of thead
56 'name' name of thread
57 'host','user': host ip or name to manage and user
58 'db', 'db_lock': database class and lock to use it in exclusion
59 '''
60 threading.Thread.__init__(self)
61 self.name = name
62 self.host = host
63 self.user = user
64 self.db = db
65 self.db_lock = db_lock
66 self.test = test
67 if not test:
68 try:
69 lvirt_module = imp.find_module("libvirt")
70 except (IOError, ImportError) as e:
71 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e))
72
73
74 self.develop_mode = develop_mode
75 self.develop_bridge_iface = develop_bridge_iface
76 self.image_path = image_path
77 self.host_id = host_id
78 self.version = version
79
80 self.xml_level = 0
81 #self.pending ={}
82
83 self.server_status = {} #dictionary with pairs server_uuid:server_status
84 self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
85 self.next_update_server_status = 0 #time when must be check servers status
86
87 self.hostinfo = None
88
89 self.queueLock = threading.Lock()
90 self.taskQueue = Queue.Queue(2000)
91
92 def ssh_connect(self):
93 try:
94 #Connect SSH
95 self.ssh_conn = paramiko.SSHClient()
96 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
97 self.ssh_conn.load_system_host_keys()
98 self.ssh_conn.connect(self.host, username=self.user, timeout=10) #, None)
99 except paramiko.ssh_exception.SSHException as e:
100 text = e.args[0]
101 print self.name, ": ssh_connect ssh Exception:", text
102
103 def load_localinfo(self):
104 if not self.test:
105 try:
106 #Connect SSH
107 self.ssh_connect()
108
109 command = 'mkdir -p ' + self.image_path
110 #print self.name, ': command:', command
111 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
112 content = stderr.read()
113 if len(content) > 0:
114 print self.name, ': command:', command, "stderr:", content
115
116 command = 'cat ' + self.image_path + '/.openvim.yaml'
117 #print self.name, ': command:', command
118 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
119 content = stdout.read()
120 if len(content) == 0:
121 print self.name, ': command:', command, "stderr:", stderr.read()
122 raise paramiko.ssh_exception.SSHException("Error empty file ")
123 self.localinfo = yaml.load(content)
124 js_v(self.localinfo, localinfo_schema)
125 self.localinfo_dirty=False
126 if 'server_files' not in self.localinfo:
127 self.localinfo['server_files'] = {}
128 print self.name, ': localinfo load from host'
129 return
130
131 except paramiko.ssh_exception.SSHException as e:
132 text = e.args[0]
133 print self.name, ": load_localinfo ssh Exception:", text
134 except lvirt_module.libvirtError as e:
135 text = e.get_error_message()
136 print self.name, ": load_localinfo libvirt Exception:", text
137 except yaml.YAMLError as exc:
138 text = ""
139 if hasattr(exc, 'problem_mark'):
140 mark = exc.problem_mark
141 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
142 print self.name, ": load_localinfo yaml format Exception", text
143 except js_e.ValidationError as e:
144 text = ""
145 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
146 print self.name, ": load_localinfo format Exception:", text, e.message
147 except Exception as e:
148 text = str(e)
149 print self.name, ": load_localinfo Exception:", text
150
151 #not loaded, insert a default data and force saving by activating dirty flag
152 self.localinfo = {'files':{}, 'server_files':{} }
153 #self.localinfo_dirty=True
154 self.localinfo_dirty=False
155
156 def load_hostinfo(self):
157 if self.test:
158 return;
159 try:
160 #Connect SSH
161 self.ssh_connect()
162
163
164 command = 'cat ' + self.image_path + '/hostinfo.yaml'
165 #print self.name, ': command:', command
166 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
167 content = stdout.read()
168 if len(content) == 0:
169 print self.name, ': command:', command, "stderr:", stderr.read()
170 raise paramiko.ssh_exception.SSHException("Error empty file ")
171 self.hostinfo = yaml.load(content)
172 js_v(self.hostinfo, hostinfo_schema)
173 print self.name, ': hostlinfo load from host', self.hostinfo
174 return
175
176 except paramiko.ssh_exception.SSHException as e:
177 text = e.args[0]
178 print self.name, ": load_hostinfo ssh Exception:", text
179 except lvirt_module.libvirtError as e:
180 text = e.get_error_message()
181 print self.name, ": load_hostinfo libvirt Exception:", text
182 except yaml.YAMLError as exc:
183 text = ""
184 if hasattr(exc, 'problem_mark'):
185 mark = exc.problem_mark
186 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
187 print self.name, ": load_hostinfo yaml format Exception", text
188 except js_e.ValidationError as e:
189 text = ""
190 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
191 print self.name, ": load_hostinfo format Exception:", text, e.message
192 except Exception as e:
193 text = str(e)
194 print self.name, ": load_hostinfo Exception:", text
195
196 #not loaded, insert a default data
197 self.hostinfo = None
198
199 def save_localinfo(self, tries=3):
200 if self.test:
201 self.localinfo_dirty = False
202 return
203
204 while tries>=0:
205 tries-=1
206
207 try:
208 command = 'cat > ' + self.image_path + '/.openvim.yaml'
209 print self.name, ': command:', command
210 (stdin, _, _) = self.ssh_conn.exec_command(command)
211 yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True)
212 self.localinfo_dirty = False
213 break #while tries
214
215 except paramiko.ssh_exception.SSHException as e:
216 text = e.args[0]
217 print self.name, ": save_localinfo ssh Exception:", text
218 if "SSH session not active" in text:
219 self.ssh_connect()
220 except lvirt_module.libvirtError as e:
221 text = e.get_error_message()
222 print self.name, ": save_localinfo libvirt Exception:", text
223 except yaml.YAMLError as exc:
224 text = ""
225 if hasattr(exc, 'problem_mark'):
226 mark = exc.problem_mark
227 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
228 print self.name, ": save_localinfo yaml format Exception", text
229 except Exception as e:
230 text = str(e)
231 print self.name, ": save_localinfo Exception:", text
232
233 def load_servers_from_db(self):
234 self.db_lock.acquire()
235 r,c = self.db.get_table(SELECT=('uuid','status', 'image_id'), FROM='instances', WHERE={'host_id': self.host_id})
236 self.db_lock.release()
237
238 self.server_status = {}
239 if r<0:
240 print self.name, ": Error getting data from database:", c
241 return
242 for server in c:
243 self.server_status[ server['uuid'] ] = server['status']
244
245 #convert from old version to new one
246 if 'inc_files' in self.localinfo and server['uuid'] in self.localinfo['inc_files']:
247 server_files_dict = {'source file': self.localinfo['inc_files'][ server['uuid'] ] [0], 'file format':'raw' }
248 if server_files_dict['source file'][-5:] == 'qcow2':
249 server_files_dict['file format'] = 'qcow2'
250
251 self.localinfo['server_files'][ server['uuid'] ] = { server['image_id'] : server_files_dict }
252 if 'inc_files' in self.localinfo:
253 del self.localinfo['inc_files']
254 self.localinfo_dirty = True
255
256 def delete_unused_files(self):
257 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
258 Deletes unused entries at self.loacalinfo and the corresponding local files.
259 The only reason for this mismatch is the manual deletion of instances (VM) at database
260 '''
261 if self.test:
262 return
263 for uuid,images in self.localinfo['server_files'].items():
264 if uuid not in self.server_status:
265 for localfile in images.values():
266 try:
267 print self.name, ": deleting file '%s' of unused server '%s'" %(localfile['source file'], uuid)
268 self.delete_file(localfile['source file'])
269 except paramiko.ssh_exception.SSHException as e:
270 print self.name, ": Exception deleting file '%s': %s" %(localfile['source file'], str(e))
271 del self.localinfo['server_files'][uuid]
272 self.localinfo_dirty = True
273
274 def insert_task(self, task, *aditional):
275 try:
276 self.queueLock.acquire()
277 task = self.taskQueue.put( (task,) + aditional, timeout=5)
278 self.queueLock.release()
279 return 1, None
280 except Queue.Full:
281 return -1, "timeout inserting a task over host " + self.name
282
283 def run(self):
284 while True:
285 self.load_localinfo()
286 self.load_hostinfo()
287 self.load_servers_from_db()
288 self.delete_unused_files()
289 while True:
290 self.queueLock.acquire()
291 if not self.taskQueue.empty():
292 task = self.taskQueue.get()
293 else:
294 task = None
295 self.queueLock.release()
296
297 if task is None:
298 now=time.time()
299 if self.localinfo_dirty:
300 self.save_localinfo()
301 elif self.next_update_server_status < now:
302 self.update_servers_status()
303 self.next_update_server_status = now + 5
304 elif len(self.pending_terminate_server)>0 and self.pending_terminate_server[0][0]<now:
305 self.server_forceoff()
306 else:
307 time.sleep(1)
308 continue
309
310 if task[0] == 'instance':
311 print self.name, ": processing task instance", task[1]['action']
312 retry=0
313 while retry <2:
314 retry += 1
315 r=self.action_on_server(task[1], retry==2)
316 if r>=0:
317 break
318 elif task[0] == 'image':
319 pass
320 elif task[0] == 'exit':
321 print self.name, ": processing task exit"
322 self.terminate()
323 return 0
324 elif task[0] == 'reload':
325 print self.name, ": processing task reload terminating and relaunching"
326 self.terminate()
327 break
328 elif task[0] == 'edit-iface':
329 print self.name, ": processing task edit-iface port=%s, old_net=%s, new_net=%s" % (task[1], task[2], task[3])
330 self.edit_iface(task[1], task[2], task[3])
331 elif task[0] == 'restore-iface':
332 print self.name, ": processing task restore-iface %s mac=%s" % (task[1], task[2])
333 self.restore_iface(task[1], task[2])
334 else:
335 print self.name, ": unknown task", task
336
337 def server_forceoff(self, wait_until_finished=False):
338 while len(self.pending_terminate_server)>0:
339 now = time.time()
340 if self.pending_terminate_server[0][0]>now:
341 if wait_until_finished:
342 time.sleep(1)
343 continue
344 else:
345 return
346 req={'uuid':self.pending_terminate_server[0][1],
347 'action':{'terminate':'force'},
348 'status': None
349 }
350 self.action_on_server(req)
351 self.pending_terminate_server.pop(0)
352
353 def terminate(self):
354 try:
355 self.server_forceoff(True)
356 if self.localinfo_dirty:
357 self.save_localinfo()
358 if not self.test:
359 self.ssh_conn.close()
360 except Exception as e:
361 text = str(e)
362 print self.name, ": terminate Exception:", text
363 print self.name, ": exit from host_thread"
364
365 def get_local_iface_name(self, generic_name):
366 if self.hostinfo != None and "iface_names" in self.hostinfo and generic_name in self.hostinfo["iface_names"]:
367 return self.hostinfo["iface_names"][generic_name]
368 return generic_name
369
370 def create_xml_server(self, server, dev_list, server_metadata={}):
371 """Function that implements the generation of the VM XML definition.
372 Additional devices are in dev_list list
373 The main disk is upon dev_list[0]"""
374
375 #get if operating system is Windows
376 windows_os = False
377 os_type = server_metadata.get('os_type', None)
378 if os_type == None and 'metadata' in dev_list[0]:
379 os_type = dev_list[0]['metadata'].get('os_type', None)
380 if os_type != None and os_type.lower() == "windows":
381 windows_os = True
382 #get type of hard disk bus
383 bus_ide = True if windows_os else False
384 bus = server_metadata.get('bus', None)
385 if bus == None and 'metadata' in dev_list[0]:
386 bus = dev_list[0]['metadata'].get('bus', None)
387 if bus != None:
388 bus_ide = True if bus=='ide' else False
389
390 self.xml_level = 0
391
392 text = "<domain type='kvm'>"
393 #get topology
394 topo = server_metadata.get('topology', None)
395 if topo == None and 'metadata' in dev_list[0]:
396 topo = dev_list[0]['metadata'].get('topology', None)
397 #name
398 name = server.get('name','') + "_" + server['uuid']
399 name = name[:58] #qemu impose a length limit of 59 chars or not start. Using 58
400 text += self.inc_tab() + "<name>" + name+ "</name>"
401 #uuid
402 text += self.tab() + "<uuid>" + server['uuid'] + "</uuid>"
403
404 numa={}
405 if 'extended' in server and server['extended']!=None and 'numas' in server['extended']:
406 numa = server['extended']['numas'][0]
407 #memory
408 use_huge = False
409 memory = int(numa.get('memory',0))*1024*1024 #in KiB
410 if memory==0:
411 memory = int(server['ram'])*1024;
412 else:
413 if not self.develop_mode:
414 use_huge = True
415 if memory==0:
416 return -1, 'No memory assigned to instance'
417 memory = str(memory)
418 text += self.tab() + "<memory unit='KiB'>" +memory+"</memory>"
419 text += self.tab() + "<currentMemory unit='KiB'>" +memory+ "</currentMemory>"
420 if use_huge:
421 text += self.tab()+'<memoryBacking>'+ \
422 self.inc_tab() + '<hugepages/>'+ \
423 self.dec_tab()+ '</memoryBacking>'
424
425 #cpu
426 use_cpu_pinning=False
427 vcpus = int(server.get("vcpus",0))
428 cpu_pinning = []
429 if 'cores-source' in numa:
430 use_cpu_pinning=True
431 for index in range(0, len(numa['cores-source'])):
432 cpu_pinning.append( [ numa['cores-id'][index], numa['cores-source'][index] ] )
433 vcpus += 1
434 if 'threads-source' in numa:
435 use_cpu_pinning=True
436 for index in range(0, len(numa['threads-source'])):
437 cpu_pinning.append( [ numa['threads-id'][index], numa['threads-source'][index] ] )
438 vcpus += 1
439 if 'paired-threads-source' in numa:
440 use_cpu_pinning=True
441 for index in range(0, len(numa['paired-threads-source'])):
442 cpu_pinning.append( [numa['paired-threads-id'][index][0], numa['paired-threads-source'][index][0] ] )
443 cpu_pinning.append( [numa['paired-threads-id'][index][1], numa['paired-threads-source'][index][1] ] )
444 vcpus += 2
445
446 if use_cpu_pinning and not self.develop_mode:
447 text += self.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning)) +"</vcpu>" + \
448 self.tab()+'<cputune>'
449 self.xml_level += 1
450 for i in range(0, len(cpu_pinning)):
451 text += self.tab() + "<vcpupin vcpu='" +str(cpu_pinning[i][0])+ "' cpuset='" +str(cpu_pinning[i][1]) +"'/>"
452 text += self.dec_tab()+'</cputune>'+ \
453 self.tab() + '<numatune>' +\
454 self.inc_tab() + "<memory mode='strict' nodeset='" +str(numa['source'])+ "'/>" +\
455 self.dec_tab() + '</numatune>'
456 else:
457 if vcpus==0:
458 return -1, "Instance without number of cpus"
459 text += self.tab()+"<vcpu>" + str(vcpus) + "</vcpu>"
460
461 #boot
462 boot_cdrom = False
463 for dev in dev_list:
464 if dev['type']=='cdrom' :
465 boot_cdrom = True
466 break
467 text += self.tab()+ '<os>' + \
468 self.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
469 if boot_cdrom:
470 text += self.tab() + "<boot dev='cdrom'/>"
471 text += self.tab() + "<boot dev='hd'/>" + \
472 self.dec_tab()+'</os>'
473 #features
474 text += self.tab()+'<features>'+\
475 self.inc_tab()+'<acpi/>' +\
476 self.tab()+'<apic/>' +\
477 self.tab()+'<pae/>'+ \
478 self.dec_tab() +'</features>'
479 if windows_os or topo=="oneSocket":
480 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>"% vcpus
481 else:
482 text += self.tab() + "<cpu mode='host-model'></cpu>"
483 text += self.tab() + "<clock offset='utc'/>" +\
484 self.tab() + "<on_poweroff>preserve</on_poweroff>" + \
485 self.tab() + "<on_reboot>restart</on_reboot>" + \
486 self.tab() + "<on_crash>restart</on_crash>"
487 text += self.tab() + "<devices>" + \
488 self.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
489 self.tab() + "<serial type='pty'>" +\
490 self.inc_tab() + "<target port='0'/>" + \
491 self.dec_tab() + "</serial>" +\
492 self.tab() + "<console type='pty'>" + \
493 self.inc_tab()+ "<target type='serial' port='0'/>" + \
494 self.dec_tab()+'</console>'
495 if windows_os:
496 text += self.tab() + "<controller type='usb' index='0'/>" + \
497 self.tab() + "<controller type='ide' index='0'/>" + \
498 self.tab() + "<input type='mouse' bus='ps2'/>" + \
499 self.tab() + "<sound model='ich6'/>" + \
500 self.tab() + "<video>" + \
501 self.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
502 self.dec_tab() + "</video>" + \
503 self.tab() + "<memballoon model='virtio'/>" + \
504 self.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
505
506 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
507 #> self.dec_tab()+'</hostdev>\n' +\
508 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
509 if windows_os:
510 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
511 else:
512 #If image contains 'GRAPH' include graphics
513 #if 'GRAPH' in image:
514 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
515 self.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
516 self.dec_tab() + "</graphics>"
517
518 vd_index = 'a'
519 for dev in dev_list:
520 bus_ide_dev = bus_ide
521 if dev['type']=='cdrom' or dev['type']=='disk':
522 if dev['type']=='cdrom':
523 bus_ide_dev = True
524 text += self.tab() + "<disk type='file' device='"+dev['type']+"'>"
525 if 'file format' in dev:
526 text += self.inc_tab() + "<driver name='qemu' type='" +dev['file format']+ "' cache='writethrough'/>"
527 if 'source file' in dev:
528 text += self.tab() + "<source file='" +dev['source file']+ "'/>"
529 #elif v['type'] == 'block':
530 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
531 #else:
532 # return -1, 'Unknown disk type ' + v['type']
533 vpci = dev.get('vpci',None)
534 if vpci == None:
535 vpci = dev['metadata'].get('vpci',None)
536 text += self.pci2xml(vpci)
537
538 if bus_ide_dev:
539 text += self.tab() + "<target dev='hd" +vd_index+ "' bus='ide'/>" #TODO allows several type of disks
540 else:
541 text += self.tab() + "<target dev='vd" +vd_index+ "' bus='virtio'/>"
542 text += self.dec_tab() + '</disk>'
543 vd_index = chr(ord(vd_index)+1)
544 elif dev['type']=='xml':
545 dev_text = dev['xml']
546 if 'vpci' in dev:
547 dev_text = dev_text.replace('__vpci__', dev['vpci'])
548 if 'source file' in dev:
549 dev_text = dev_text.replace('__file__', dev['source file'])
550 if 'file format' in dev:
551 dev_text = dev_text.replace('__format__', dev['source file'])
552 if '__dev__' in dev_text:
553 dev_text = dev_text.replace('__dev__', vd_index)
554 vd_index = chr(ord(vd_index)+1)
555 text += dev_text
556 else:
557 return -1, 'Unknown device type ' + dev['type']
558
559 net_nb=0
560 bridge_interfaces = server.get('networks', [])
561 for v in bridge_interfaces:
562 #Get the brifge name
563 self.db_lock.acquire()
564 result, content = self.db.get_table(FROM='nets', SELECT=('provider',),WHERE={'uuid':v['net_id']} )
565 self.db_lock.release()
566 if result <= 0:
567 print "create_xml_server ERROR getting nets",result, content
568 return -1, content
569 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
570 #I know it is not secure
571 #for v in sorted(desc['network interfaces'].itervalues()):
572 model = v.get("model", None)
573 if content[0]['provider']=='default':
574 text += self.tab() + "<interface type='network'>" + \
575 self.inc_tab() + "<source network='" +content[0]['provider']+ "'/>"
576 elif content[0]['provider'][0:7]=='macvtap':
577 text += self.tab()+"<interface type='direct'>" + \
578 self.inc_tab() + "<source dev='" + self.get_local_iface_name(content[0]['provider'][8:]) + "' mode='bridge'/>" + \
579 self.tab() + "<target dev='macvtap0'/>"
580 if windows_os:
581 text += self.tab() + "<alias name='net" + str(net_nb) + "'/>"
582 elif model==None:
583 model = "virtio"
584 elif content[0]['provider'][0:6]=='bridge':
585 text += self.tab() + "<interface type='bridge'>" + \
586 self.inc_tab()+"<source bridge='" +self.get_local_iface_name(content[0]['provider'][7:])+ "'/>"
587 if windows_os:
588 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
589 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
590 elif model==None:
591 model = "virtio"
592 else:
593 return -1, 'Unknown Bridge net provider ' + content[0]['provider']
594 if model!=None:
595 text += self.tab() + "<model type='" +model+ "'/>"
596 if v.get('mac_address', None) != None:
597 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
598 text += self.pci2xml(v.get('vpci',None))
599 text += self.dec_tab()+'</interface>'
600
601 net_nb += 1
602
603 interfaces = numa.get('interfaces', [])
604
605 net_nb=0
606 for v in interfaces:
607 if self.develop_mode: #map these interfaces to bridges
608 text += self.tab() + "<interface type='bridge'>" + \
609 self.inc_tab()+"<source bridge='" +self.develop_bridge_iface+ "'/>"
610 if windows_os:
611 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
612 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
613 else:
614 text += self.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
615 if v.get('mac_address', None) != None:
616 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
617 text += self.pci2xml(v.get('vpci',None))
618 text += self.dec_tab()+'</interface>'
619 continue
620
621 if v['dedicated'] == 'yes': #passthrought
622 text += self.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
623 self.inc_tab() + "<source>"
624 self.inc_tab()
625 text += self.pci2xml(v['source'])
626 text += self.dec_tab()+'</source>'
627 text += self.pci2xml(v.get('vpci',None))
628 if windows_os:
629 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
630 text += self.dec_tab()+'</hostdev>'
631 net_nb += 1
632 else: #sriov_interfaces
633 #skip not connected interfaces
634 if v.get("net_id") == None:
635 continue
636 text += self.tab() + "<interface type='hostdev' managed='yes'>"
637 self.inc_tab()
638 if v.get('mac_address', None) != None:
639 text+= self.tab() + "<mac address='" +v['mac_address']+ "'/>"
640 text+= self.tab()+'<source>'
641 self.inc_tab()
642 text += self.pci2xml(v['source'])
643 text += self.dec_tab()+'</source>'
644 if v.get('vlan',None) != None:
645 text += self.tab() + "<vlan> <tag id='" + str(v['vlan']) + "'/> </vlan>"
646 text += self.pci2xml(v.get('vpci',None))
647 if windows_os:
648 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
649 text += self.dec_tab()+'</interface>'
650
651
652 text += self.dec_tab()+'</devices>'+\
653 self.dec_tab()+'</domain>'
654 return 0, text
655
656 def pci2xml(self, pci):
657 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
658 alows an empty pci text'''
659 if pci is None:
660 return ""
661 first_part = pci.split(':')
662 second_part = first_part[2].split('.')
663 return self.tab() + "<address type='pci' domain='0x" + first_part[0] + \
664 "' bus='0x" + first_part[1] + "' slot='0x" + second_part[0] + \
665 "' function='0x" + second_part[1] + "'/>"
666
667 def tab(self):
668 """Return indentation according to xml_level"""
669 return "\n" + (' '*self.xml_level)
670
671 def inc_tab(self):
672 """Increment and return indentation according to xml_level"""
673 self.xml_level += 1
674 return self.tab()
675
676 def dec_tab(self):
677 """Decrement and return indentation according to xml_level"""
678 self.xml_level -= 1
679 return self.tab()
680
681 def get_file_info(self, path):
682 command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
683 print self.name, ': command:', command
684 (_, stdout, _) = self.ssh_conn.exec_command(command)
685 content = stdout.read()
686 if len(content) == 0:
687 return None # file does not exist
688 else:
689 return content.split(" ") #(permission, 1, owner, group, size, date, file)
690
691 def qemu_get_info(self, path):
692 command = 'qemu-img info ' + path
693 print self.name, ': command:', command
694 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
695 content = stdout.read()
696 if len(content) == 0:
697 error = stderr.read()
698 print self.name, ": get_qemu_info error ", error
699 raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error)
700 else:
701 try:
702 return yaml.load(content)
703 except yaml.YAMLError as exc:
704 text = ""
705 if hasattr(exc, 'problem_mark'):
706 mark = exc.problem_mark
707 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
708 print self.name, ": get_qemu_info yaml format Exception", text
709 raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text)
710
711 def qemu_change_backing(self, inc_file, new_backing_file):
712 command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file
713 print self.name, ': command:', command
714 (_, _, stderr) = self.ssh_conn.exec_command(command)
715 content = stderr.read()
716 if len(content) == 0:
717 return 0
718 else:
719 print self.name, ": qemu_change_backing error: ", content
720 return -1
721
722 def get_notused_filename(self, proposed_name, suffix=''):
723 '''Look for a non existing file_name in the host
724 proposed_name: proposed file name, includes path
725 suffix: suffix to be added to the name, before the extention
726 '''
727 extension = proposed_name.rfind(".")
728 slash = proposed_name.rfind("/")
729 if extension < 0 or extension < slash: # no extension
730 extension = len(proposed_name)
731 target_name = proposed_name[:extension] + suffix + proposed_name[extension:]
732 info = self.get_file_info(target_name)
733 if info is None:
734 return target_name
735
736 index=0
737 while info is not None:
738 target_name = proposed_name[:extension] + suffix + "-" + str(index) + proposed_name[extension:]
739 index+=1
740 info = self.get_file_info(target_name)
741 return target_name
742
743 def get_notused_path(self, proposed_path, suffix=''):
744 '''Look for a non existing path at database for images
745 proposed_path: proposed file name, includes path
746 suffix: suffix to be added to the name, before the extention
747 '''
748 extension = proposed_path.rfind(".")
749 if extension < 0:
750 extension = len(proposed_path)
751 if suffix != None:
752 target_path = proposed_path[:extension] + suffix + proposed_path[extension:]
753 index=0
754 while True:
755 r,_=self.db.get_table(FROM="images",WHERE={"path":target_path})
756 if r<=0:
757 return target_path
758 target_path = proposed_path[:extension] + suffix + "-" + str(index) + proposed_path[extension:]
759 index+=1
760
761
762 def delete_file(self, file_name):
763 command = 'rm -f '+file_name
764 print self.name, ': command:', command
765 (_, _, stderr) = self.ssh_conn.exec_command(command)
766 error_msg = stderr.read()
767 if len(error_msg) > 0:
768 raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg)
769
770 def copy_file(self, source, destination, perserve_time=True):
771 if source[0:4]=="http":
772 command = "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
773 dst=destination, src=source, dst_result=destination + ".result" )
774 else:
775 command = 'cp --no-preserve=mode'
776 if perserve_time:
777 command += ' --preserve=timestamps'
778 command += " '{}' '{}'".format(source, destination)
779 print self.name, ': command:', command
780 (_, _, stderr) = self.ssh_conn.exec_command(command)
781 error_msg = stderr.read()
782 if len(error_msg) > 0:
783 raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg)
784
785 def copy_remote_file(self, remote_file, use_incremental):
786 ''' Copy a file from the repository to local folder and recursively
787 copy the backing files in case the remote file is incremental
788 Read and/or modified self.localinfo['files'] that contain the
789 unmodified copies of images in the local path
790 params:
791 remote_file: path of remote file
792 use_incremental: None (leave the decision to this function), True, False
793 return:
794 local_file: name of local file
795 qemu_info: dict with quemu information of local file
796 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
797 '''
798
799 use_incremental_out = use_incremental
800 new_backing_file = None
801 local_file = None
802 file_from_local = True
803
804 #in case incremental use is not decided, take the decision depending on the image
805 #avoid the use of incremental if this image is already incremental
806 if remote_file[0:4] == "http":
807 file_from_local = False
808 if file_from_local:
809 qemu_remote_info = self.qemu_get_info(remote_file)
810 if use_incremental_out==None:
811 use_incremental_out = not ( file_from_local and 'backing file' in qemu_remote_info)
812 #copy recursivelly the backing files
813 if file_from_local and 'backing file' in qemu_remote_info:
814 new_backing_file, _, _ = self.copy_remote_file(qemu_remote_info['backing file'], True)
815
816 #check if remote file is present locally
817 if use_incremental_out and remote_file in self.localinfo['files']:
818 local_file = self.localinfo['files'][remote_file]
819 local_file_info = self.get_file_info(local_file)
820 if file_from_local:
821 remote_file_info = self.get_file_info(remote_file)
822 if local_file_info == None:
823 local_file = None
824 elif file_from_local and (local_file_info[4]!=remote_file_info[4] or local_file_info[5]!=remote_file_info[5]):
825 #local copy of file not valid because date or size are different.
826 #TODO DELETE local file if this file is not used by any active virtual machine
827 try:
828 self.delete_file(local_file)
829 del self.localinfo['files'][remote_file]
830 except Exception:
831 pass
832 local_file = None
833 else: #check that the local file has the same backing file, or there are not backing at all
834 qemu_info = self.qemu_get_info(local_file)
835 if new_backing_file != qemu_info.get('backing file'):
836 local_file = None
837
838
839 if local_file == None: #copy the file
840 img_name= remote_file.split('/') [-1]
841 img_local = self.image_path + '/' + img_name
842 local_file = self.get_notused_filename(img_local)
843 self.copy_file(remote_file, local_file, use_incremental_out)
844
845 if use_incremental_out:
846 self.localinfo['files'][remote_file] = local_file
847 if new_backing_file:
848 self.qemu_change_backing(local_file, new_backing_file)
849 qemu_info = self.qemu_get_info(local_file)
850
851 return local_file, qemu_info, use_incremental_out
852
853 def launch_server(self, conn, server, rebuild=False, domain=None):
854 if self.test:
855 time.sleep(random.randint(20,150)) #sleep random timeto be make it a bit more real
856 return 0, 'Success'
857
858 server_id = server['uuid']
859 paused = server.get('paused','no')
860 try:
861 if domain!=None and rebuild==False:
862 domain.resume()
863 #self.server_status[server_id] = 'ACTIVE'
864 return 0, 'Success'
865
866 self.db_lock.acquire()
867 result, server_data = self.db.get_instance(server_id)
868 self.db_lock.release()
869 if result <= 0:
870 print self.name, ": launch_server ERROR getting server from DB",result, server_data
871 return result, server_data
872
873 #0: get image metadata
874 server_metadata = server.get('metadata', {})
875 use_incremental = None
876
877 if "use_incremental" in server_metadata:
878 use_incremental = False if server_metadata["use_incremental"]=="no" else True
879
880 server_host_files = self.localinfo['server_files'].get( server['uuid'], {})
881 if rebuild:
882 #delete previous incremental files
883 for file_ in server_host_files.values():
884 self.delete_file(file_['source file'] )
885 server_host_files={}
886
887 #1: obtain aditional devices (disks)
888 #Put as first device the main disk
889 devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ]
890 if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']:
891 devices += server_data['extended']['devices']
892
893 for dev in devices:
894 if dev['image_id'] == None:
895 continue
896
897 self.db_lock.acquire()
898 result, content = self.db.get_table(FROM='images', SELECT=('path','metadata'),WHERE={'uuid':dev['image_id']} )
899 self.db_lock.release()
900 if result <= 0:
901 error_text = "ERROR", result, content, "when getting image", dev['image_id']
902 print self.name, ": launch_server", error_text
903 return -1, error_text
904 if content[0]['metadata'] is not None:
905 dev['metadata'] = json.loads(content[0]['metadata'])
906 else:
907 dev['metadata'] = {}
908
909 if dev['image_id'] in server_host_files:
910 dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path
911 dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2
912 continue
913
914 #2: copy image to host
915 remote_file = content[0]['path']
916 use_incremental_image = use_incremental
917 if dev['metadata'].get("use_incremental") == "no":
918 use_incremental_image = False
919 local_file, qemu_info, use_incremental_image = self.copy_remote_file(remote_file, use_incremental_image)
920
921 #create incremental image
922 if use_incremental_image:
923 local_file_inc = self.get_notused_filename(local_file, '.inc')
924 command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file
925 print 'command:', command
926 (_, _, stderr) = self.ssh_conn.exec_command(command)
927 error_msg = stderr.read()
928 if len(error_msg) > 0:
929 raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg)
930 local_file = local_file_inc
931 qemu_info = {'file format':'qcow2'}
932
933 server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']}
934
935 dev['source file'] = local_file
936 dev['file format'] = qemu_info['file format']
937
938 self.localinfo['server_files'][ server['uuid'] ] = server_host_files
939 self.localinfo_dirty = True
940
941 #3 Create XML
942 result, xml = self.create_xml_server(server_data, devices, server_metadata) #local_file
943 if result <0:
944 print self.name, ": create xml server error:", xml
945 return -2, xml
946 print self.name, ": create xml:", xml
947 atribute = lvirt_module.VIR_DOMAIN_START_PAUSED if paused == "yes" else 0
948 #4 Start the domain
949 if not rebuild: #ensures that any pending destroying server is done
950 self.server_forceoff(True)
951 #print self.name, ": launching instance" #, xml
952 conn.createXML(xml, atribute)
953 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
954
955 return 0, 'Success'
956
957 except paramiko.ssh_exception.SSHException as e:
958 text = e.args[0]
959 print self.name, ": launch_server(%s) ssh Exception: %s" %(server_id, text)
960 if "SSH session not active" in text:
961 self.ssh_connect()
962 except lvirt_module.libvirtError as e:
963 text = e.get_error_message()
964 print self.name, ": launch_server(%s) libvirt Exception: %s" %(server_id, text)
965 except Exception as e:
966 text = str(e)
967 print self.name, ": launch_server(%s) Exception: %s" %(server_id, text)
968 return -1, text
969
970 def update_servers_status(self):
971 # # virDomainState
972 # VIR_DOMAIN_NOSTATE = 0
973 # VIR_DOMAIN_RUNNING = 1
974 # VIR_DOMAIN_BLOCKED = 2
975 # VIR_DOMAIN_PAUSED = 3
976 # VIR_DOMAIN_SHUTDOWN = 4
977 # VIR_DOMAIN_SHUTOFF = 5
978 # VIR_DOMAIN_CRASHED = 6
979 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
980
981 if self.test or len(self.server_status)==0:
982 return
983
984 try:
985 conn = lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
986 domains= conn.listAllDomains()
987 domain_dict={}
988 for domain in domains:
989 uuid = domain.UUIDString() ;
990 libvirt_status = domain.state()
991 #print libvirt_status
992 if libvirt_status[0] == lvirt_module.VIR_DOMAIN_RUNNING or libvirt_status[0] == lvirt_module.VIR_DOMAIN_SHUTDOWN:
993 new_status = "ACTIVE"
994 elif libvirt_status[0] == lvirt_module.VIR_DOMAIN_PAUSED:
995 new_status = "PAUSED"
996 elif libvirt_status[0] == lvirt_module.VIR_DOMAIN_SHUTOFF:
997 new_status = "INACTIVE"
998 elif libvirt_status[0] == lvirt_module.VIR_DOMAIN_CRASHED:
999 new_status = "ERROR"
1000 else:
1001 new_status = None
1002 domain_dict[uuid] = new_status
1003 conn.close
1004 except lvirt_module.libvirtError as e:
1005 print self.name, ": get_state() Exception '", e.get_error_message()
1006 return
1007
1008 for server_id, current_status in self.server_status.iteritems():
1009 new_status = None
1010 if server_id in domain_dict:
1011 new_status = domain_dict[server_id]
1012 else:
1013 new_status = "INACTIVE"
1014
1015 if new_status == None or new_status == current_status:
1016 continue
1017 if new_status == 'INACTIVE' and current_status == 'ERROR':
1018 continue #keep ERROR status, because obviously this machine is not running
1019 #change status
1020 print self.name, ": server ", server_id, "status change from ", current_status, "to", new_status
1021 STATUS={'progress':100, 'status':new_status}
1022 if new_status == 'ERROR':
1023 STATUS['last_error'] = 'machine has crashed'
1024 self.db_lock.acquire()
1025 r,_ = self.db.update_rows('instances', STATUS, {'uuid':server_id}, log=False)
1026 self.db_lock.release()
1027 if r>=0:
1028 self.server_status[server_id] = new_status
1029
1030 def action_on_server(self, req, last_retry=True):
1031 '''Perform an action on a req
1032 Attributes:
1033 req: dictionary that contain:
1034 server properties: 'uuid','name','tenant_id','status'
1035 action: 'action'
1036 host properties: 'user', 'ip_name'
1037 return (error, text)
1038 0: No error. VM is updated to new state,
1039 -1: Invalid action, as trying to pause a PAUSED VM
1040 -2: Error accessing host
1041 -3: VM nor present
1042 -4: Error at DB access
1043 -5: Error while trying to perform action. VM is updated to ERROR
1044 '''
1045 server_id = req['uuid']
1046 conn = None
1047 new_status = None
1048 old_status = req['status']
1049 last_error = None
1050
1051 if self.test:
1052 if 'terminate' in req['action']:
1053 new_status = 'deleted'
1054 elif 'shutoff' in req['action'] or 'shutdown' in req['action'] or 'forceOff' in req['action']:
1055 if req['status']!='ERROR':
1056 time.sleep(5)
1057 new_status = 'INACTIVE'
1058 elif 'start' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1059 elif 'resume' in req['action'] and req['status']!='ERROR' and req['status']!='INACTIVE' : new_status = 'ACTIVE'
1060 elif 'pause' in req['action'] and req['status']!='ERROR': new_status = 'PAUSED'
1061 elif 'reboot' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1062 elif 'rebuild' in req['action']:
1063 time.sleep(random.randint(20,150))
1064 new_status = 'ACTIVE'
1065 elif 'createImage' in req['action']:
1066 time.sleep(5)
1067 self.create_image(None, req)
1068 else:
1069 try:
1070 conn = lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1071 try:
1072 dom = conn.lookupByUUIDString(server_id)
1073 except lvirt_module.libvirtError as e:
1074 text = e.get_error_message()
1075 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1076 dom = None
1077 else:
1078 print self.name, ": action_on_server(",server_id,") libvirt exception:", text
1079 raise e
1080
1081 if 'forceOff' in req['action']:
1082 if dom == None:
1083 print self.name, ": action_on_server(",server_id,") domain not running"
1084 else:
1085 try:
1086 print self.name, ": sending DESTROY to server", server_id
1087 dom.destroy()
1088 except Exception as e:
1089 if "domain is not running" not in e.get_error_message():
1090 print self.name, ": action_on_server(",server_id,") Exception while sending force off:", e.get_error_message()
1091 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1092 new_status = 'ERROR'
1093
1094 elif 'terminate' in req['action']:
1095 if dom == None:
1096 print self.name, ": action_on_server(",server_id,") domain not running"
1097 new_status = 'deleted'
1098 else:
1099 try:
1100 if req['action']['terminate'] == 'force':
1101 print self.name, ": sending DESTROY to server", server_id
1102 dom.destroy()
1103 new_status = 'deleted'
1104 else:
1105 print self.name, ": sending SHUTDOWN to server", server_id
1106 dom.shutdown()
1107 self.pending_terminate_server.append( (time.time()+10,server_id) )
1108 except Exception as e:
1109 print self.name, ": action_on_server(",server_id,") Exception while destroy:", e.get_error_message()
1110 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1111 new_status = 'ERROR'
1112 if "domain is not running" in e.get_error_message():
1113 try:
1114 dom.undefine()
1115 new_status = 'deleted'
1116 except Exception:
1117 print self.name, ": action_on_server(",server_id,") Exception while undefine:", e.get_error_message()
1118 last_error = 'action_on_server Exception2 while undefine:', e.get_error_message()
1119 #Exception: 'virDomainDetachDevice() failed'
1120 if new_status=='deleted':
1121 if server_id in self.server_status:
1122 del self.server_status[server_id]
1123 if req['uuid'] in self.localinfo['server_files']:
1124 for file_ in self.localinfo['server_files'][ req['uuid'] ].values():
1125 try:
1126 self.delete_file(file_['source file'])
1127 except Exception:
1128 pass
1129 del self.localinfo['server_files'][ req['uuid'] ]
1130 self.localinfo_dirty = True
1131
1132 elif 'shutoff' in req['action'] or 'shutdown' in req['action']:
1133 try:
1134 if dom == None:
1135 print self.name, ": action_on_server(",server_id,") domain not running"
1136 else:
1137 dom.shutdown()
1138 # new_status = 'INACTIVE'
1139 #TODO: check status for changing at database
1140 except Exception as e:
1141 new_status = 'ERROR'
1142 print self.name, ": action_on_server(",server_id,") Exception while shutdown:", e.get_error_message()
1143 last_error = 'action_on_server Exception while shutdown: ' + e.get_error_message()
1144
1145 elif 'rebuild' in req['action']:
1146 if dom != None:
1147 dom.destroy()
1148 r = self.launch_server(conn, req, True, None)
1149 if r[0] <0:
1150 new_status = 'ERROR'
1151 last_error = r[1]
1152 else:
1153 new_status = 'ACTIVE'
1154 elif 'start' in req['action']:
1155 #La instancia está sólo en la base de datos pero no en la libvirt. es necesario crearla
1156 rebuild = True if req['action']['start']=='rebuild' else False
1157 r = self.launch_server(conn, req, rebuild, dom)
1158 if r[0] <0:
1159 new_status = 'ERROR'
1160 last_error = r[1]
1161 else:
1162 new_status = 'ACTIVE'
1163
1164 elif 'resume' in req['action']:
1165 try:
1166 if dom == None:
1167 pass
1168 else:
1169 dom.resume()
1170 # new_status = 'ACTIVE'
1171 except Exception as e:
1172 print self.name, ": action_on_server(",server_id,") Exception while resume:", e.get_error_message()
1173
1174 elif 'pause' in req['action']:
1175 try:
1176 if dom == None:
1177 pass
1178 else:
1179 dom.suspend()
1180 # new_status = 'PAUSED'
1181 except Exception as e:
1182 print self.name, ": action_on_server(",server_id,") Exception while pause:", e.get_error_message()
1183
1184 elif 'reboot' in req['action']:
1185 try:
1186 if dom == None:
1187 pass
1188 else:
1189 dom.reboot()
1190 print self.name, ": action_on_server(",server_id,") reboot:"
1191 #new_status = 'ACTIVE'
1192 except Exception as e:
1193 print self.name, ": action_on_server(",server_id,") Exception while reboot:", e.get_error_message()
1194 elif 'createImage' in req['action']:
1195 self.create_image(dom, req)
1196
1197
1198 conn.close()
1199 except lvirt_module.libvirtError as e:
1200 if conn is not None: conn.close
1201 text = e.get_error_message()
1202 new_status = "ERROR"
1203 last_error = text
1204 print self.name, ": action_on_server(",server_id,") Exception '", text
1205 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1206 print self.name, ": action_on_server(",server_id,") Exception removed from host"
1207 #end of if self.test
1208 if new_status == None:
1209 return 1
1210
1211 print self.name, ": action_on_server(",server_id,") new status", new_status, last_error
1212 UPDATE = {'progress':100, 'status':new_status}
1213
1214 if new_status=='ERROR':
1215 if not last_retry: #if there will be another retry do not update database
1216 return -1
1217 elif 'terminate' in req['action']:
1218 #PUT a log in the database
1219 print self.name, ": PANIC deleting server", server_id, last_error
1220 self.db_lock.acquire()
1221 self.db.new_row('logs',
1222 {'uuid':server_id, 'tenant_id':req['tenant_id'], 'related':'instances','level':'panic',
1223 'description':'PANIC deleting server from host '+self.name+': '+last_error}
1224 )
1225 self.db_lock.release()
1226 if server_id in self.server_status:
1227 del self.server_status[server_id]
1228 return -1
1229 else:
1230 UPDATE['last_error'] = last_error
1231 if new_status != 'deleted' and (new_status != old_status or new_status == 'ERROR') :
1232 self.db_lock.acquire()
1233 self.db.update_rows('instances', UPDATE, {'uuid':server_id}, log=True)
1234 self.server_status[server_id] = new_status
1235 self.db_lock.release()
1236 if new_status == 'ERROR':
1237 return -1
1238 return 1
1239
1240
1241 def restore_iface(self, name, mac, lib_conn=None):
1242 ''' make an ifdown, ifup to restore default parameter of na interface
1243 Params:
1244 mac: mac address of the interface
1245 lib_conn: connection to the libvirt, if None a new connection is created
1246 Return 0,None if ok, -1,text if fails
1247 '''
1248 conn=None
1249 ret = 0
1250 error_text=None
1251 if self.test:
1252 print self.name, ": restore_iface '%s' %s" % (name, mac)
1253 return 0, None
1254 try:
1255 if not lib_conn:
1256 conn = lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1257 else:
1258 conn = lib_conn
1259
1260 #wait to the pending VM deletion
1261 #TODO.Revise self.server_forceoff(True)
1262
1263 iface = conn.interfaceLookupByMACString(mac)
1264 iface.destroy()
1265 iface.create()
1266 print self.name, ": restore_iface '%s' %s" % (name, mac)
1267 except lvirt_module.libvirtError as e:
1268 error_text = e.get_error_message()
1269 print self.name, ": restore_iface '%s' '%s' libvirt exception: %s" %(name, mac, error_text)
1270 ret=-1
1271 finally:
1272 if lib_conn is None and conn is not None:
1273 conn.close
1274 return ret, error_text
1275
1276
1277 def create_image(self,dom, req):
1278 if self.test:
1279 if 'path' in req['action']['createImage']:
1280 file_dst = req['action']['createImage']['path']
1281 else:
1282 createImage=req['action']['createImage']
1283 img_name= createImage['source']['path']
1284 index=img_name.rfind('/')
1285 file_dst = self.get_notused_path(img_name[:index+1] + createImage['name'] + '.qcow2')
1286 image_status='ACTIVE'
1287 else:
1288 for retry in (0,1):
1289 try:
1290 server_id = req['uuid']
1291 createImage=req['action']['createImage']
1292 file_orig = self.localinfo['server_files'][server_id] [ createImage['source']['image_id'] ] ['source file']
1293 if 'path' in req['action']['createImage']:
1294 file_dst = req['action']['createImage']['path']
1295 else:
1296 img_name= createImage['source']['path']
1297 index=img_name.rfind('/')
1298 file_dst = self.get_notused_filename(img_name[:index+1] + createImage['name'] + '.qcow2')
1299
1300 self.copy_file(file_orig, file_dst)
1301 qemu_info = self.qemu_get_info(file_orig)
1302 if 'backing file' in qemu_info:
1303 for k,v in self.localinfo['files'].items():
1304 if v==qemu_info['backing file']:
1305 self.qemu_change_backing(file_dst, k)
1306 break
1307 image_status='ACTIVE'
1308 break
1309 except paramiko.ssh_exception.SSHException as e:
1310 image_status='ERROR'
1311 error_text = e.args[0]
1312 print self.name, "': create_image(",server_id,") ssh Exception:", error_text
1313 if "SSH session not active" in error_text and retry==0:
1314 self.ssh_connect()
1315 except Exception as e:
1316 image_status='ERROR'
1317 error_text = str(e)
1318 print self.name, "': create_image(",server_id,") Exception:", error_text
1319
1320 #TODO insert a last_error at database
1321 self.db_lock.acquire()
1322 self.db.update_rows('images', {'status':image_status, 'progress': 100, 'path':file_dst},
1323 {'uuid':req['new_image']['uuid']}, log=True)
1324 self.db_lock.release()
1325
1326 def edit_iface(self, port_id, old_net, new_net):
1327 #This action imply remove and insert interface to put proper parameters
1328 if self.test:
1329 time.sleep(1)
1330 else:
1331 #get iface details
1332 self.db_lock.acquire()
1333 r,c = self.db.get_table(FROM='ports as p join resources_port as rp on p.uuid=rp.port_id',
1334 WHERE={'port_id': port_id})
1335 self.db_lock.release()
1336 if r<0:
1337 print self.name, ": edit_iface(",port_id,") DDBB error:", c
1338 return
1339 elif r==0:
1340 print self.name, ": edit_iface(",port_id,") por not found"
1341 return
1342 port=c[0]
1343 if port["model"]!="VF":
1344 print self.name, ": edit_iface(",port_id,") ERROR model must be VF"
1345 return
1346 #create xml detach file
1347 xml=[]
1348 self.xml_level = 2
1349 xml.append("<interface type='hostdev' managed='yes'>")
1350 xml.append(" <mac address='" +port['mac']+ "'/>")
1351 xml.append(" <source>"+ self.pci2xml(port['pci'])+"\n </source>")
1352 xml.append('</interface>')
1353
1354
1355 try:
1356 conn=None
1357 conn = lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1358 dom = conn.lookupByUUIDString(port["instance_id"])
1359 if old_net:
1360 text="\n".join(xml)
1361 print self.name, ": edit_iface detaching SRIOV interface", text
1362 dom.detachDeviceFlags(text, flags=lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
1363 if new_net:
1364 xml[-1] =" <vlan> <tag id='" + str(port['vlan']) + "'/> </vlan>"
1365 self.xml_level = 1
1366 xml.append(self.pci2xml(port.get('vpci',None)) )
1367 xml.append('</interface>')
1368 text="\n".join(xml)
1369 print self.name, ": edit_iface attaching SRIOV interface", text
1370 dom.attachDeviceFlags(text, flags=lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
1371
1372 except lvirt_module.libvirtError as e:
1373 text = e.get_error_message()
1374 print self.name, ": edit_iface(",port["instance_id"],") libvirt exception:", text
1375
1376 finally:
1377 if conn is not None: conn.close
1378
1379
1380 def create_server(server, db, db_lock, only_of_ports):
1381 #print "server"
1382 #print "server"
1383 #print server
1384 #print "server"
1385 #print "server"
1386 #try:
1387 # host_id = server.get('host_id', None)
1388 extended = server.get('extended', None)
1389
1390 # print '----------------------'
1391 # print json.dumps(extended, indent=4)
1392
1393 requirements={}
1394 requirements['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
1395 requirements['ram'] = server['flavor'].get('ram', 0)
1396 if requirements['ram']== None:
1397 requirements['ram'] = 0
1398 requirements['vcpus'] = server['flavor'].get('vcpus', 0)
1399 if requirements['vcpus']== None:
1400 requirements['vcpus'] = 0
1401 #If extended is not defined get requirements from flavor
1402 if extended is None:
1403 #If extended is defined in flavor convert to dictionary and use it
1404 if 'extended' in server['flavor'] and server['flavor']['extended'] != None:
1405 json_acceptable_string = server['flavor']['extended'].replace("'", "\"")
1406 extended = json.loads(json_acceptable_string)
1407 else:
1408 extended = None
1409 #print json.dumps(extended, indent=4)
1410
1411 #For simplicity only one numa VM are supported in the initial implementation
1412 if extended != None:
1413 numas = extended.get('numas', [])
1414 if len(numas)>1:
1415 return (-2, "Multi-NUMA VMs are not supported yet")
1416 #elif len(numas)<1:
1417 # return (-1, "At least one numa must be specified")
1418
1419 #a for loop is used in order to be ready to multi-NUMA VMs
1420 request = []
1421 for numa in numas:
1422 numa_req = {}
1423 numa_req['memory'] = numa.get('memory', 0)
1424 if 'cores' in numa:
1425 numa_req['proc_req_nb'] = numa['cores'] #number of cores or threads to be reserved
1426 numa_req['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
1427 numa_req['proc_req_list'] = numa.get('cores-id', None) #list of ids to be assigned to the cores or threads
1428 elif 'paired-threads' in numa:
1429 numa_req['proc_req_nb'] = numa['paired-threads']
1430 numa_req['proc_req_type'] = 'paired-threads'
1431 numa_req['proc_req_list'] = numa.get('paired-threads-id', None)
1432 elif 'threads' in numa:
1433 numa_req['proc_req_nb'] = numa['threads']
1434 numa_req['proc_req_type'] = 'threads'
1435 numa_req['proc_req_list'] = numa.get('threads-id', None)
1436 else:
1437 numa_req['proc_req_nb'] = 0 # by default
1438 numa_req['proc_req_type'] = 'threads'
1439
1440
1441
1442 #Generate a list of sriov and another for physical interfaces
1443 interfaces = numa.get('interfaces', [])
1444 sriov_list = []
1445 port_list = []
1446 for iface in interfaces:
1447 iface['bandwidth'] = int(iface['bandwidth'])
1448 if iface['dedicated'][:3]=='yes':
1449 port_list.append(iface)
1450 else:
1451 sriov_list.append(iface)
1452
1453 #Save lists ordered from more restrictive to less bw requirements
1454 numa_req['sriov_list'] = sorted(sriov_list, key=lambda k: k['bandwidth'], reverse=True)
1455 numa_req['port_list'] = sorted(port_list, key=lambda k: k['bandwidth'], reverse=True)
1456
1457
1458 request.append(numa_req)
1459
1460 # print "----------\n"+json.dumps(request[0], indent=4)
1461 # print '----------\n\n'
1462
1463 #Search in db for an appropriate numa for each requested numa
1464 #at the moment multi-NUMA VMs are not supported
1465 if len(request)>0:
1466 requirements['numa'].update(request[0])
1467 if requirements['numa']['memory']>0:
1468 requirements['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
1469 elif requirements['ram']==0:
1470 return (-1, "Memory information not set neither at extended field not at ram")
1471 if requirements['numa']['proc_req_nb']>0:
1472 requirements['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
1473 elif requirements['vcpus']==0:
1474 return (-1, "Processor information not set neither at extended field not at vcpus")
1475
1476
1477 db_lock.acquire()
1478 result, content = db.get_numas(requirements, server.get('host_id', None), only_of_ports)
1479 db_lock.release()
1480
1481 if result == -1:
1482 return (-1, content)
1483
1484 numa_id = content['numa_id']
1485 host_id = content['host_id']
1486
1487 #obtain threads_id and calculate pinning
1488 cpu_pinning = []
1489 reserved_threads=[]
1490 if requirements['numa']['proc_req_nb']>0:
1491 db_lock.acquire()
1492 result, content = db.get_table(FROM='resources_core',
1493 SELECT=('id','core_id','thread_id'),
1494 WHERE={'numa_id':numa_id,'instance_id': None, 'status':'ok'} )
1495 db_lock.release()
1496 if result <= 0:
1497 print content
1498 return -1, content
1499
1500 #convert rows to a dictionary indexed by core_id
1501 cores_dict = {}
1502 for row in content:
1503 if not row['core_id'] in cores_dict:
1504 cores_dict[row['core_id']] = []
1505 cores_dict[row['core_id']].append([row['thread_id'],row['id']])
1506
1507 #In case full cores are requested
1508 paired = 'N'
1509 if requirements['numa']['proc_req_type'] == 'cores':
1510 #Get/create the list of the vcpu_ids
1511 vcpu_id_list = requirements['numa']['proc_req_list']
1512 if vcpu_id_list == None:
1513 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
1514
1515 for threads in cores_dict.itervalues():
1516 #we need full cores
1517 if len(threads) != 2:
1518 continue
1519
1520 #set pinning for the first thread
1521 cpu_pinning.append( [ vcpu_id_list.pop(0), threads[0][0], threads[0][1] ] )
1522
1523 #reserve so it is not used the second thread
1524 reserved_threads.append(threads[1][1])
1525
1526 if len(vcpu_id_list) == 0:
1527 break
1528
1529 #In case paired threads are requested
1530 elif requirements['numa']['proc_req_type'] == 'paired-threads':
1531 paired = 'Y'
1532 #Get/create the list of the vcpu_ids
1533 if requirements['numa']['proc_req_list'] != None:
1534 vcpu_id_list = []
1535 for pair in requirements['numa']['proc_req_list']:
1536 if len(pair)!=2:
1537 return -1, "Field paired-threads-id not properly specified"
1538 return
1539 vcpu_id_list.append(pair[0])
1540 vcpu_id_list.append(pair[1])
1541 else:
1542 vcpu_id_list = range(0,2*int(requirements['numa']['proc_req_nb']))
1543
1544 for threads in cores_dict.itervalues():
1545 #we need full cores
1546 if len(threads) != 2:
1547 continue
1548 #set pinning for the first thread
1549 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
1550
1551 #set pinning for the second thread
1552 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
1553
1554 if len(vcpu_id_list) == 0:
1555 break
1556
1557 #In case normal threads are requested
1558 elif requirements['numa']['proc_req_type'] == 'threads':
1559 #Get/create the list of the vcpu_ids
1560 vcpu_id_list = requirements['numa']['proc_req_list']
1561 if vcpu_id_list == None:
1562 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
1563
1564 for threads_index in sorted(cores_dict, key=lambda k: len(cores_dict[k])):
1565 threads = cores_dict[threads_index]
1566 #set pinning for the first thread
1567 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
1568
1569 #if exists, set pinning for the second thread
1570 if len(threads) == 2 and len(vcpu_id_list) != 0:
1571 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
1572
1573 if len(vcpu_id_list) == 0:
1574 break
1575
1576 #Get the source pci addresses for the selected numa
1577 used_sriov_ports = []
1578 for port in requirements['numa']['sriov_list']:
1579 db_lock.acquire()
1580 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
1581 db_lock.release()
1582 if result <= 0:
1583 print content
1584 return -1, content
1585 for row in content:
1586 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
1587 continue
1588 port['pci'] = row['pci']
1589 if 'mac_address' not in port:
1590 port['mac_address'] = row['mac']
1591 del port['mac']
1592 port['port_id']=row['id']
1593 port['Mbps_used'] = port['bandwidth']
1594 used_sriov_ports.append(row['id'])
1595 break
1596
1597 for port in requirements['numa']['port_list']:
1598 port['Mbps_used'] = None
1599 if port['dedicated'] != "yes:sriov":
1600 port['mac_address'] = port['mac']
1601 del port['mac']
1602 continue
1603 db_lock.acquire()
1604 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac', 'Mbps'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
1605 db_lock.release()
1606 if result <= 0:
1607 print content
1608 return -1, content
1609 port['Mbps_used'] = content[0]['Mbps']
1610 for row in content:
1611 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
1612 continue
1613 port['pci'] = row['pci']
1614 if 'mac_address' not in port:
1615 port['mac_address'] = row['mac'] # mac cannot be set to passthrough ports
1616 del port['mac']
1617 port['port_id']=row['id']
1618 used_sriov_ports.append(row['id'])
1619 break
1620
1621 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
1622 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
1623
1624 server['host_id'] = host_id
1625
1626
1627 #Generate dictionary for saving in db the instance resources
1628 resources = {}
1629 resources['bridged-ifaces'] = []
1630
1631 numa_dict = {}
1632 numa_dict['interfaces'] = []
1633
1634 numa_dict['interfaces'] += requirements['numa']['port_list']
1635 numa_dict['interfaces'] += requirements['numa']['sriov_list']
1636
1637 #Check bridge information
1638 unified_dataplane_iface=[]
1639 unified_dataplane_iface += requirements['numa']['port_list']
1640 unified_dataplane_iface += requirements['numa']['sriov_list']
1641
1642 for control_iface in server.get('networks', []):
1643 control_iface['net_id']=control_iface.pop('uuid')
1644 #Get the brifge name
1645 db_lock.acquire()
1646 result, content = db.get_table(FROM='nets', SELECT=('name','type', 'vlan'),WHERE={'uuid':control_iface['net_id']} )
1647 db_lock.release()
1648 if result < 0:
1649 pass
1650 elif result==0:
1651 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface['net_id']
1652 else:
1653 network=content[0]
1654 if control_iface.get("type", 'virtual') == 'virtual':
1655 if network['type']!='bridge_data' and network['type']!='bridge_man':
1656 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface['net_id']
1657 resources['bridged-ifaces'].append(control_iface)
1658 else:
1659 if network['type']!='data' and network['type']!='ptp':
1660 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface['net_id']
1661 #dataplane interface, look for it in the numa tree and asign this network
1662 iface_found=False
1663 for dataplane_iface in numa_dict['interfaces']:
1664 if dataplane_iface['name'] == control_iface.get("name"):
1665 if (dataplane_iface['dedicated'] == "yes" and control_iface["type"] != "PF") or \
1666 (dataplane_iface['dedicated'] == "no" and control_iface["type"] != "VF") or \
1667 (dataplane_iface['dedicated'] == "yes:sriov" and control_iface["type"] != "VFnotShared") :
1668 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
1669 (control_iface.get("name"), dataplane_iface['dedicated'], control_iface["type"])
1670 dataplane_iface['uuid'] = control_iface['net_id']
1671 if dataplane_iface['dedicated'] == "no":
1672 dataplane_iface['vlan'] = network['vlan']
1673 if dataplane_iface['dedicated'] != "yes" and control_iface.get("mac_address"):
1674 dataplane_iface['mac_address'] = control_iface.get("mac_address")
1675 if control_iface.get("vpci"):
1676 dataplane_iface['vpci'] = control_iface.get("vpci")
1677 iface_found=True
1678 break
1679 if not iface_found:
1680 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface.get("name")
1681
1682 resources['host_id'] = host_id
1683 resources['image_id'] = server['image_id']
1684 resources['flavor_id'] = server['flavor_id']
1685 resources['tenant_id'] = server['tenant_id']
1686 resources['ram'] = requirements['ram']
1687 resources['vcpus'] = requirements['vcpus']
1688 resources['status'] = 'CREATING'
1689
1690 if 'description' in server: resources['description'] = server['description']
1691 if 'name' in server: resources['name'] = server['name']
1692
1693 resources['extended'] = {} #optional
1694 resources['extended']['numas'] = []
1695 numa_dict['numa_id'] = numa_id
1696 numa_dict['memory'] = requirements['numa']['memory']
1697 numa_dict['cores'] = []
1698
1699 for core in cpu_pinning:
1700 numa_dict['cores'].append({'id': core[2], 'vthread': core[0], 'paired': paired})
1701 for core in reserved_threads:
1702 numa_dict['cores'].append({'id': core})
1703 resources['extended']['numas'].append(numa_dict)
1704 if extended!=None and 'devices' in extended: #TODO allow extra devices without numa
1705 resources['extended']['devices'] = extended['devices']
1706
1707
1708 print '===================================={'
1709 print json.dumps(resources, indent=4)
1710 print '====================================}'
1711
1712 return 0, resources
1713