Merge branch 'v1.1'
[osm/openvim.git] / host_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 '''
28 __author__ = "Pablo Montes, Alfonso Tierno, Leonardo Mirabal"
29 __date__ = "$10-jul-2014 12:07:15$"
30
31 import json
32 import yaml
33 import threading
34 import time
35 import Queue
36 import paramiko
37 from jsonschema import validate as js_v, exceptions as js_e
38 #import libvirt
39 import imp
40 from vim_schema import localinfo_schema, hostinfo_schema
41 import random
42 import os
43
44 #TODO: insert a logging system
45
46 # from logging import Logger
47 # import auxiliary_functions as af
48
49 # TODO: insert a logging system
50
51
52 class host_thread(threading.Thread):
53 lvirt_module = None
54
55 def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode,
56 develop_bridge_iface):
57 '''Init a thread.
58 Arguments:
59 'id' number of thead
60 'name' name of thread
61 'host','user': host ip or name to manage and user
62 'db', 'db_lock': database class and lock to use it in exclusion
63 '''
64 threading.Thread.__init__(self)
65 self.name = name
66 self.host = host
67 self.user = user
68 self.db = db
69 self.db_lock = db_lock
70 self.test = test
71
72 if not test and not host_thread.lvirt_module:
73 try:
74 module_info = imp.find_module("libvirt")
75 host_thread.lvirt_module = imp.load_module("libvirt", *module_info)
76 except (IOError, ImportError) as e:
77 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e))
78
79
80 self.develop_mode = develop_mode
81 self.develop_bridge_iface = develop_bridge_iface
82 self.image_path = image_path
83 self.host_id = host_id
84 self.version = version
85
86 self.xml_level = 0
87 #self.pending ={}
88
89 self.server_status = {} #dictionary with pairs server_uuid:server_status
90 self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
91 self.next_update_server_status = 0 #time when must be check servers status
92
93 self.hostinfo = None
94
95 self.queueLock = threading.Lock()
96 self.taskQueue = Queue.Queue(2000)
97 self.ssh_conn = None
98
99 def ssh_connect(self):
100 try:
101 #Connect SSH
102 self.ssh_conn = paramiko.SSHClient()
103 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
104 self.ssh_conn.load_system_host_keys()
105 self.ssh_conn.connect(self.host, username=self.user, timeout=10) #, None)
106 except paramiko.ssh_exception.SSHException as e:
107 text = e.args[0]
108 print self.name, ": ssh_connect ssh Exception:", text
109
110 def load_localinfo(self):
111 if not self.test:
112 try:
113 #Connect SSH
114 self.ssh_connect()
115
116 command = 'mkdir -p ' + self.image_path
117 #print self.name, ': command:', command
118 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
119 content = stderr.read()
120 if len(content) > 0:
121 print self.name, ': command:', command, "stderr:", content
122
123 command = 'cat ' + self.image_path + '/.openvim.yaml'
124 #print self.name, ': command:', command
125 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
126 content = stdout.read()
127 if len(content) == 0:
128 print self.name, ': command:', command, "stderr:", stderr.read()
129 raise paramiko.ssh_exception.SSHException("Error empty file ")
130 self.localinfo = yaml.load(content)
131 js_v(self.localinfo, localinfo_schema)
132 self.localinfo_dirty=False
133 if 'server_files' not in self.localinfo:
134 self.localinfo['server_files'] = {}
135 print self.name, ': localinfo load from host'
136 return
137
138 except paramiko.ssh_exception.SSHException as e:
139 text = e.args[0]
140 print self.name, ": load_localinfo ssh Exception:", text
141 except host_thread.lvirt_module.libvirtError as e:
142 text = e.get_error_message()
143 print self.name, ": load_localinfo libvirt Exception:", text
144 except yaml.YAMLError as exc:
145 text = ""
146 if hasattr(exc, 'problem_mark'):
147 mark = exc.problem_mark
148 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
149 print self.name, ": load_localinfo yaml format Exception", text
150 except js_e.ValidationError as e:
151 text = ""
152 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
153 print self.name, ": load_localinfo format Exception:", text, e.message
154 except Exception as e:
155 text = str(e)
156 print self.name, ": load_localinfo Exception:", text
157
158 #not loaded, insert a default data and force saving by activating dirty flag
159 self.localinfo = {'files':{}, 'server_files':{} }
160 #self.localinfo_dirty=True
161 self.localinfo_dirty=False
162
163 def load_hostinfo(self):
164 if self.test:
165 return;
166 try:
167 #Connect SSH
168 self.ssh_connect()
169
170
171 command = 'cat ' + self.image_path + '/hostinfo.yaml'
172 #print self.name, ': command:', command
173 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
174 content = stdout.read()
175 if len(content) == 0:
176 print self.name, ': command:', command, "stderr:", stderr.read()
177 raise paramiko.ssh_exception.SSHException("Error empty file ")
178 self.hostinfo = yaml.load(content)
179 js_v(self.hostinfo, hostinfo_schema)
180 print self.name, ': hostlinfo load from host', self.hostinfo
181 return
182
183 except paramiko.ssh_exception.SSHException as e:
184 text = e.args[0]
185 print self.name, ": load_hostinfo ssh Exception:", text
186 except host_thread.lvirt_module.libvirtError as e:
187 text = e.get_error_message()
188 print self.name, ": load_hostinfo libvirt Exception:", text
189 except yaml.YAMLError as exc:
190 text = ""
191 if hasattr(exc, 'problem_mark'):
192 mark = exc.problem_mark
193 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
194 print self.name, ": load_hostinfo yaml format Exception", text
195 except js_e.ValidationError as e:
196 text = ""
197 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
198 print self.name, ": load_hostinfo format Exception:", text, e.message
199 except Exception as e:
200 text = str(e)
201 print self.name, ": load_hostinfo Exception:", text
202
203 #not loaded, insert a default data
204 self.hostinfo = None
205
206 def save_localinfo(self, tries=3):
207 if self.test:
208 self.localinfo_dirty = False
209 return
210
211 while tries>=0:
212 tries-=1
213
214 try:
215 command = 'cat > ' + self.image_path + '/.openvim.yaml'
216 print self.name, ': command:', command
217 (stdin, _, _) = self.ssh_conn.exec_command(command)
218 yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True)
219 self.localinfo_dirty = False
220 break #while tries
221
222 except paramiko.ssh_exception.SSHException as e:
223 text = e.args[0]
224 print self.name, ": save_localinfo ssh Exception:", text
225 if "SSH session not active" in text:
226 self.ssh_connect()
227 except host_thread.lvirt_module.libvirtError as e:
228 text = e.get_error_message()
229 print self.name, ": save_localinfo libvirt Exception:", text
230 except yaml.YAMLError as exc:
231 text = ""
232 if hasattr(exc, 'problem_mark'):
233 mark = exc.problem_mark
234 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
235 print self.name, ": save_localinfo yaml format Exception", text
236 except Exception as e:
237 text = str(e)
238 print self.name, ": save_localinfo Exception:", text
239
240 def load_servers_from_db(self):
241 self.db_lock.acquire()
242 r,c = self.db.get_table(SELECT=('uuid','status', 'image_id'), FROM='instances', WHERE={'host_id': self.host_id})
243 self.db_lock.release()
244
245 self.server_status = {}
246 if r<0:
247 print self.name, ": Error getting data from database:", c
248 return
249 for server in c:
250 self.server_status[ server['uuid'] ] = server['status']
251
252 #convert from old version to new one
253 if 'inc_files' in self.localinfo and server['uuid'] in self.localinfo['inc_files']:
254 server_files_dict = {'source file': self.localinfo['inc_files'][ server['uuid'] ] [0], 'file format':'raw' }
255 if server_files_dict['source file'][-5:] == 'qcow2':
256 server_files_dict['file format'] = 'qcow2'
257
258 self.localinfo['server_files'][ server['uuid'] ] = { server['image_id'] : server_files_dict }
259 if 'inc_files' in self.localinfo:
260 del self.localinfo['inc_files']
261 self.localinfo_dirty = True
262
263 def delete_unused_files(self):
264 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
265 Deletes unused entries at self.loacalinfo and the corresponding local files.
266 The only reason for this mismatch is the manual deletion of instances (VM) at database
267 '''
268 if self.test:
269 return
270 for uuid,images in self.localinfo['server_files'].items():
271 if uuid not in self.server_status:
272 for localfile in images.values():
273 try:
274 print self.name, ": deleting file '%s' of unused server '%s'" %(localfile['source file'], uuid)
275 self.delete_file(localfile['source file'])
276 except paramiko.ssh_exception.SSHException as e:
277 print self.name, ": Exception deleting file '%s': %s" %(localfile['source file'], str(e))
278 del self.localinfo['server_files'][uuid]
279 self.localinfo_dirty = True
280
281 def insert_task(self, task, *aditional):
282 try:
283 self.queueLock.acquire()
284 task = self.taskQueue.put( (task,) + aditional, timeout=5)
285 self.queueLock.release()
286 return 1, None
287 except Queue.Full:
288 return -1, "timeout inserting a task over host " + self.name
289
290 def run(self):
291 while True:
292 self.load_localinfo()
293 self.load_hostinfo()
294 self.load_servers_from_db()
295 self.delete_unused_files()
296 while True:
297 self.queueLock.acquire()
298 if not self.taskQueue.empty():
299 task = self.taskQueue.get()
300 else:
301 task = None
302 self.queueLock.release()
303
304 if task is None:
305 now=time.time()
306 if self.localinfo_dirty:
307 self.save_localinfo()
308 elif self.next_update_server_status < now:
309 self.update_servers_status()
310 self.next_update_server_status = now + 5
311 elif len(self.pending_terminate_server)>0 and self.pending_terminate_server[0][0]<now:
312 self.server_forceoff()
313 else:
314 time.sleep(1)
315 continue
316
317 if task[0] == 'instance':
318 print self.name, ": processing task instance", task[1]['action']
319 retry=0
320 while retry <2:
321 retry += 1
322 r=self.action_on_server(task[1], retry==2)
323 if r>=0:
324 break
325 elif task[0] == 'image':
326 pass
327 elif task[0] == 'exit':
328 print self.name, ": processing task exit"
329 self.terminate()
330 return 0
331 elif task[0] == 'reload':
332 print self.name, ": processing task reload terminating and relaunching"
333 self.terminate()
334 break
335 elif task[0] == 'edit-iface':
336 print self.name, ": processing task edit-iface port=%s, old_net=%s, new_net=%s" % (task[1], task[2], task[3])
337 self.edit_iface(task[1], task[2], task[3])
338 elif task[0] == 'restore-iface':
339 print self.name, ": processing task restore-iface %s mac=%s" % (task[1], task[2])
340 self.restore_iface(task[1], task[2])
341 elif task[0] == 'new-ovsbridge':
342 print self.name, ": Creating compute OVS bridge"
343 self.create_ovs_bridge()
344 break
345 elif task[0] == 'new-vxlan':
346 print self.name, ": Creating vxlan tunnel=" + task[1] + ", remote ip=" + task[2]
347 self.create_ovs_vxlan_tunnel(task[1], task[2])
348 break
349 elif task[0] == 'del-ovsbridge':
350 print self.name, ": Deleting OVS bridge"
351 self.delete_ovs_bridge()
352 break
353 elif task[0] == 'del-vxlan':
354 print self.name, ": Deleting vxlan " + task[1] + " tunnel"
355 self.delete_ovs_vxlan_tunnel(task[1])
356 break
357 elif task[0] == 'create-ovs-bridge-port':
358 print self.name, ": Adding port ovim-" + task[1] + " to OVS bridge"
359 self.create_ovs_bridge_port(task[1])
360 elif task[0] == 'del-ovs-port':
361 self.delete_bridge_port_attached_to_ovs(task[1], task[2])
362 else:
363 print self.name, ": unknown task", task
364
365 def server_forceoff(self, wait_until_finished=False):
366 while len(self.pending_terminate_server)>0:
367 now = time.time()
368 if self.pending_terminate_server[0][0]>now:
369 if wait_until_finished:
370 time.sleep(1)
371 continue
372 else:
373 return
374 req={'uuid':self.pending_terminate_server[0][1],
375 'action':{'terminate':'force'},
376 'status': None
377 }
378 self.action_on_server(req)
379 self.pending_terminate_server.pop(0)
380
381 def terminate(self):
382 try:
383 self.server_forceoff(True)
384 if self.localinfo_dirty:
385 self.save_localinfo()
386 if not self.test:
387 self.ssh_conn.close()
388 except Exception as e:
389 text = str(e)
390 print self.name, ": terminate Exception:", text
391 print self.name, ": exit from host_thread"
392
393 def get_local_iface_name(self, generic_name):
394 if self.hostinfo != None and "iface_names" in self.hostinfo and generic_name in self.hostinfo["iface_names"]:
395 return self.hostinfo["iface_names"][generic_name]
396 return generic_name
397
398 def create_xml_server(self, server, dev_list, server_metadata={}):
399 """Function that implements the generation of the VM XML definition.
400 Additional devices are in dev_list list
401 The main disk is upon dev_list[0]"""
402
403 #get if operating system is Windows
404 windows_os = False
405 os_type = server_metadata.get('os_type', None)
406 if os_type == None and 'metadata' in dev_list[0]:
407 os_type = dev_list[0]['metadata'].get('os_type', None)
408 if os_type != None and os_type.lower() == "windows":
409 windows_os = True
410 #get type of hard disk bus
411 bus_ide = True if windows_os else False
412 bus = server_metadata.get('bus', None)
413 if bus == None and 'metadata' in dev_list[0]:
414 bus = dev_list[0]['metadata'].get('bus', None)
415 if bus != None:
416 bus_ide = True if bus=='ide' else False
417
418 self.xml_level = 0
419
420 text = "<domain type='kvm'>"
421 #get topology
422 topo = server_metadata.get('topology', None)
423 if topo == None and 'metadata' in dev_list[0]:
424 topo = dev_list[0]['metadata'].get('topology', None)
425 #name
426 name = server.get('name','') + "_" + server['uuid']
427 name = name[:58] #qemu impose a length limit of 59 chars or not start. Using 58
428 text += self.inc_tab() + "<name>" + name+ "</name>"
429 #uuid
430 text += self.tab() + "<uuid>" + server['uuid'] + "</uuid>"
431
432 numa={}
433 if 'extended' in server and server['extended']!=None and 'numas' in server['extended']:
434 numa = server['extended']['numas'][0]
435 #memory
436 use_huge = False
437 memory = int(numa.get('memory',0))*1024*1024 #in KiB
438 if memory==0:
439 memory = int(server['ram'])*1024;
440 else:
441 if not self.develop_mode:
442 use_huge = True
443 if memory==0:
444 return -1, 'No memory assigned to instance'
445 memory = str(memory)
446 text += self.tab() + "<memory unit='KiB'>" +memory+"</memory>"
447 text += self.tab() + "<currentMemory unit='KiB'>" +memory+ "</currentMemory>"
448 if use_huge:
449 text += self.tab()+'<memoryBacking>'+ \
450 self.inc_tab() + '<hugepages/>'+ \
451 self.dec_tab()+ '</memoryBacking>'
452
453 #cpu
454 use_cpu_pinning=False
455 vcpus = int(server.get("vcpus",0))
456 cpu_pinning = []
457 if 'cores-source' in numa:
458 use_cpu_pinning=True
459 for index in range(0, len(numa['cores-source'])):
460 cpu_pinning.append( [ numa['cores-id'][index], numa['cores-source'][index] ] )
461 vcpus += 1
462 if 'threads-source' in numa:
463 use_cpu_pinning=True
464 for index in range(0, len(numa['threads-source'])):
465 cpu_pinning.append( [ numa['threads-id'][index], numa['threads-source'][index] ] )
466 vcpus += 1
467 if 'paired-threads-source' in numa:
468 use_cpu_pinning=True
469 for index in range(0, len(numa['paired-threads-source'])):
470 cpu_pinning.append( [numa['paired-threads-id'][index][0], numa['paired-threads-source'][index][0] ] )
471 cpu_pinning.append( [numa['paired-threads-id'][index][1], numa['paired-threads-source'][index][1] ] )
472 vcpus += 2
473
474 if use_cpu_pinning and not self.develop_mode:
475 text += self.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning)) +"</vcpu>" + \
476 self.tab()+'<cputune>'
477 self.xml_level += 1
478 for i in range(0, len(cpu_pinning)):
479 text += self.tab() + "<vcpupin vcpu='" +str(cpu_pinning[i][0])+ "' cpuset='" +str(cpu_pinning[i][1]) +"'/>"
480 text += self.dec_tab()+'</cputune>'+ \
481 self.tab() + '<numatune>' +\
482 self.inc_tab() + "<memory mode='strict' nodeset='" +str(numa['source'])+ "'/>" +\
483 self.dec_tab() + '</numatune>'
484 else:
485 if vcpus==0:
486 return -1, "Instance without number of cpus"
487 text += self.tab()+"<vcpu>" + str(vcpus) + "</vcpu>"
488
489 #boot
490 boot_cdrom = False
491 for dev in dev_list:
492 if dev['type']=='cdrom' :
493 boot_cdrom = True
494 break
495 text += self.tab()+ '<os>' + \
496 self.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
497 if boot_cdrom:
498 text += self.tab() + "<boot dev='cdrom'/>"
499 text += self.tab() + "<boot dev='hd'/>" + \
500 self.dec_tab()+'</os>'
501 #features
502 text += self.tab()+'<features>'+\
503 self.inc_tab()+'<acpi/>' +\
504 self.tab()+'<apic/>' +\
505 self.tab()+'<pae/>'+ \
506 self.dec_tab() +'</features>'
507 if windows_os or topo=="oneSocket":
508 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>"% vcpus
509 else:
510 text += self.tab() + "<cpu mode='host-model'></cpu>"
511 text += self.tab() + "<clock offset='utc'/>" +\
512 self.tab() + "<on_poweroff>preserve</on_poweroff>" + \
513 self.tab() + "<on_reboot>restart</on_reboot>" + \
514 self.tab() + "<on_crash>restart</on_crash>"
515 text += self.tab() + "<devices>" + \
516 self.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
517 self.tab() + "<serial type='pty'>" +\
518 self.inc_tab() + "<target port='0'/>" + \
519 self.dec_tab() + "</serial>" +\
520 self.tab() + "<console type='pty'>" + \
521 self.inc_tab()+ "<target type='serial' port='0'/>" + \
522 self.dec_tab()+'</console>'
523 if windows_os:
524 text += self.tab() + "<controller type='usb' index='0'/>" + \
525 self.tab() + "<controller type='ide' index='0'/>" + \
526 self.tab() + "<input type='mouse' bus='ps2'/>" + \
527 self.tab() + "<sound model='ich6'/>" + \
528 self.tab() + "<video>" + \
529 self.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
530 self.dec_tab() + "</video>" + \
531 self.tab() + "<memballoon model='virtio'/>" + \
532 self.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
533
534 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
535 #> self.dec_tab()+'</hostdev>\n' +\
536 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
537 if windows_os:
538 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
539 else:
540 #If image contains 'GRAPH' include graphics
541 #if 'GRAPH' in image:
542 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
543 self.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
544 self.dec_tab() + "</graphics>"
545
546 vd_index = 'a'
547 for dev in dev_list:
548 bus_ide_dev = bus_ide
549 if dev['type']=='cdrom' or dev['type']=='disk':
550 if dev['type']=='cdrom':
551 bus_ide_dev = True
552 text += self.tab() + "<disk type='file' device='"+dev['type']+"'>"
553 if 'file format' in dev:
554 text += self.inc_tab() + "<driver name='qemu' type='" +dev['file format']+ "' cache='writethrough'/>"
555 if 'source file' in dev:
556 text += self.tab() + "<source file='" +dev['source file']+ "'/>"
557 #elif v['type'] == 'block':
558 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
559 #else:
560 # return -1, 'Unknown disk type ' + v['type']
561 vpci = dev.get('vpci',None)
562 if vpci == None:
563 vpci = dev['metadata'].get('vpci',None)
564 text += self.pci2xml(vpci)
565
566 if bus_ide_dev:
567 text += self.tab() + "<target dev='hd" +vd_index+ "' bus='ide'/>" #TODO allows several type of disks
568 else:
569 text += self.tab() + "<target dev='vd" +vd_index+ "' bus='virtio'/>"
570 text += self.dec_tab() + '</disk>'
571 vd_index = chr(ord(vd_index)+1)
572 elif dev['type']=='xml':
573 dev_text = dev['xml']
574 if 'vpci' in dev:
575 dev_text = dev_text.replace('__vpci__', dev['vpci'])
576 if 'source file' in dev:
577 dev_text = dev_text.replace('__file__', dev['source file'])
578 if 'file format' in dev:
579 dev_text = dev_text.replace('__format__', dev['source file'])
580 if '__dev__' in dev_text:
581 dev_text = dev_text.replace('__dev__', vd_index)
582 vd_index = chr(ord(vd_index)+1)
583 text += dev_text
584 else:
585 return -1, 'Unknown device type ' + dev['type']
586
587 net_nb=0
588 bridge_interfaces = server.get('networks', [])
589 for v in bridge_interfaces:
590 #Get the brifge name
591 self.db_lock.acquire()
592 result, content = self.db.get_table(FROM='nets', SELECT=('provider',),WHERE={'uuid':v['net_id']} )
593 self.db_lock.release()
594 if result <= 0:
595 print "create_xml_server ERROR getting nets",result, content
596 return -1, content
597 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
598 #I know it is not secure
599 #for v in sorted(desc['network interfaces'].itervalues()):
600 model = v.get("model", None)
601 if content[0]['provider']=='default':
602 text += self.tab() + "<interface type='network'>" + \
603 self.inc_tab() + "<source network='" +content[0]['provider']+ "'/>"
604 elif content[0]['provider'][0:7]=='macvtap':
605 text += self.tab()+"<interface type='direct'>" + \
606 self.inc_tab() + "<source dev='" + self.get_local_iface_name(content[0]['provider'][8:]) + "' mode='bridge'/>" + \
607 self.tab() + "<target dev='macvtap0'/>"
608 if windows_os:
609 text += self.tab() + "<alias name='net" + str(net_nb) + "'/>"
610 elif model==None:
611 model = "virtio"
612 elif content[0]['provider'][0:6]=='bridge':
613 text += self.tab() + "<interface type='bridge'>" + \
614 self.inc_tab()+"<source bridge='" +self.get_local_iface_name(content[0]['provider'][7:])+ "'/>"
615 if windows_os:
616 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
617 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
618 elif model==None:
619 model = "virtio"
620 elif content[0]['provider'][0:3] == "OVS":
621 vlan = content[0]['provider'].replace('OVS:', '')
622 text += self.tab() + "<interface type='bridge'>" + \
623 self.inc_tab() + "<source bridge='ovim-" + vlan + "'/>"
624 else:
625 return -1, 'Unknown Bridge net provider ' + content[0]['provider']
626 if model!=None:
627 text += self.tab() + "<model type='" +model+ "'/>"
628 if v.get('mac_address', None) != None:
629 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
630 text += self.pci2xml(v.get('vpci',None))
631 text += self.dec_tab()+'</interface>'
632
633 net_nb += 1
634
635 interfaces = numa.get('interfaces', [])
636
637 net_nb=0
638 for v in interfaces:
639 if self.develop_mode: #map these interfaces to bridges
640 text += self.tab() + "<interface type='bridge'>" + \
641 self.inc_tab()+"<source bridge='" +self.develop_bridge_iface+ "'/>"
642 if windows_os:
643 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
644 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
645 else:
646 text += self.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
647 if v.get('mac_address', None) != None:
648 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
649 text += self.pci2xml(v.get('vpci',None))
650 text += self.dec_tab()+'</interface>'
651 continue
652
653 if v['dedicated'] == 'yes': #passthrought
654 text += self.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
655 self.inc_tab() + "<source>"
656 self.inc_tab()
657 text += self.pci2xml(v['source'])
658 text += self.dec_tab()+'</source>'
659 text += self.pci2xml(v.get('vpci',None))
660 if windows_os:
661 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
662 text += self.dec_tab()+'</hostdev>'
663 net_nb += 1
664 else: #sriov_interfaces
665 #skip not connected interfaces
666 if v.get("net_id") == None:
667 continue
668 text += self.tab() + "<interface type='hostdev' managed='yes'>"
669 self.inc_tab()
670 if v.get('mac_address', None) != None:
671 text+= self.tab() + "<mac address='" +v['mac_address']+ "'/>"
672 text+= self.tab()+'<source>'
673 self.inc_tab()
674 text += self.pci2xml(v['source'])
675 text += self.dec_tab()+'</source>'
676 if v.get('vlan',None) != None:
677 text += self.tab() + "<vlan> <tag id='" + str(v['vlan']) + "'/> </vlan>"
678 text += self.pci2xml(v.get('vpci',None))
679 if windows_os:
680 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
681 text += self.dec_tab()+'</interface>'
682
683
684 text += self.dec_tab()+'</devices>'+\
685 self.dec_tab()+'</domain>'
686 return 0, text
687
688 def pci2xml(self, pci):
689 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
690 alows an empty pci text'''
691 if pci is None:
692 return ""
693 first_part = pci.split(':')
694 second_part = first_part[2].split('.')
695 return self.tab() + "<address type='pci' domain='0x" + first_part[0] + \
696 "' bus='0x" + first_part[1] + "' slot='0x" + second_part[0] + \
697 "' function='0x" + second_part[1] + "'/>"
698
699 def tab(self):
700 """Return indentation according to xml_level"""
701 return "\n" + (' '*self.xml_level)
702
703 def inc_tab(self):
704 """Increment and return indentation according to xml_level"""
705 self.xml_level += 1
706 return self.tab()
707
708 def dec_tab(self):
709 """Decrement and return indentation according to xml_level"""
710 self.xml_level -= 1
711 return self.tab()
712
713 def create_ovs_bridge(self):
714 """
715 Create a bridge in compute OVS to allocate VMs
716 :return: True if success
717 """
718 if self.test:
719 return
720 command = 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
721 print self.name, ': command:', command
722 (_, stdout, _) = self.ssh_conn.exec_command(command)
723 content = stdout.read()
724 if len(content) == 0:
725 return True
726 else:
727 return False
728
729 def delete_port_to_ovs_bridge(self, vlan, net_uuid):
730 """
731 Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed
732 :param vlan: vlan port id
733 :param net_uuid: network id
734 :return:
735 """
736
737 if self.test:
738 return
739
740 port_name = 'ovim-' + vlan
741 command = 'sudo ovs-vsctl del-port br-int ' + port_name
742 print self.name, ': command:', command
743 (_, stdout, _) = self.ssh_conn.exec_command(command)
744 content = stdout.read()
745 if len(content) == 0:
746 return True
747 else:
748 return False
749
750 def delete_dhcp_server(self, vlan, net_uuid, dhcp_path):
751 """
752 Delete dhcp server process lining in namespace
753 :param vlan: segmentation id
754 :param net_uuid: network uuid
755 :param dhcp_path: conf fiel path that live in namespace side
756 :return:
757 """
758 if self.test:
759 return
760 if not self.is_dhcp_port_free(vlan, net_uuid):
761 return True
762
763 net_namespace = 'ovim-' + vlan
764 dhcp_path = os.path.join(dhcp_path, net_namespace)
765 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
766
767 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_file
768 print self.name, ': command:', command
769 (_, stdout, _) = self.ssh_conn.exec_command(command)
770 content = stdout.read()
771
772 command = 'sudo ip netns exec ' + net_namespace + ' kill -9 ' + content
773 print self.name, ': command:', command
774 (_, stdout, _) = self.ssh_conn.exec_command(command)
775 content = stdout.read()
776
777 # if len(content) == 0:
778 # return True
779 # else:
780 # return False
781
782 def is_dhcp_port_free(self, host_id, net_uuid):
783 """
784 Check if any port attached to the a net in a vxlan mesh across computes nodes
785 :param host_id: host id
786 :param net_uuid: network id
787 :return: True if is not free
788 """
789 self.db_lock.acquire()
790 result, content = self.db.get_table(
791 FROM='ports',
792 WHERE={'p.type': 'instance:ovs', 'p.net_id': net_uuid}
793 )
794 self.db_lock.release()
795
796 if len(content) > 0:
797 return False
798 else:
799 return True
800
801 def is_port_free(self, host_id, net_uuid):
802 """
803 Check if there not ovs ports of a network in a compute host.
804 :param host_id: host id
805 :param net_uuid: network id
806 :return: True if is not free
807 """
808
809 self.db_lock.acquire()
810 result, content = self.db.get_table(
811 FROM='ports as p join instances as i on p.instance_id=i.uuid',
812 WHERE={"i.host_id": self.host_id, 'p.type': 'instance:ovs', 'p.net_id': net_uuid}
813 )
814 self.db_lock.release()
815
816 if len(content) > 0:
817 return False
818 else:
819 return True
820
821 def add_port_to_ovs_bridge(self, vlan):
822 """
823 Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge
824 :param vlan: vlan port id
825 :return: True if success
826 """
827
828 if self.test:
829 return
830
831 port_name = 'ovim-' + vlan
832 command = 'sudo ovs-vsctl add-port br-int ' + port_name + ' tag=' + vlan
833 print self.name, ': command:', command
834 (_, stdout, _) = self.ssh_conn.exec_command(command)
835 content = stdout.read()
836 if len(content) == 0:
837 return True
838 else:
839 return False
840
841 def delete_dhcp_port(self, vlan, net_uuid):
842 """
843 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
844 :param vlan: segmentation id
845 :param net_uuid: network id
846 :return: True if success
847 """
848
849 if self.test:
850 return
851
852 if not self.is_dhcp_port_free(vlan, net_uuid):
853 return True
854 self.delete_dhcp_interfaces(vlan)
855 return True
856
857 def delete_bridge_port_attached_to_ovs(self, vlan, net_uuid):
858 """
859 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
860 :param vlan:
861 :param net_uuid:
862 :return: True if success
863 """
864 if self.test:
865 return
866
867 if not self.is_port_free(vlan, net_uuid):
868 return True
869 self.delete_port_to_ovs_bridge(vlan, net_uuid)
870 self.delete_linux_bridge(vlan)
871 return True
872
873 def delete_linux_bridge(self, vlan):
874 """
875 Delete a linux bridge in a scpecific compute.
876 :param vlan: vlan port id
877 :return: True if success
878 """
879
880 if self.test:
881 return
882
883 port_name = 'ovim-' + vlan
884 command = 'sudo ip link set dev veth0-' + vlan + ' down'
885 print self.name, ': command:', command
886 (_, stdout, _) = self.ssh_conn.exec_command(command)
887 content = stdout.read()
888 #
889 # if len(content) != 0:
890 # return False
891
892 command = 'sudo ifconfig ' + port_name + ' down && sudo brctl delbr ' + port_name
893 print self.name, ': command:', command
894 (_, stdout, _) = self.ssh_conn.exec_command(command)
895 content = stdout.read()
896 if len(content) == 0:
897 return True
898 else:
899 return False
900
901 def create_ovs_bridge_port(self, vlan):
902 """
903 Generate a linux bridge and attache the port to a OVS bridge
904 :param vlan: vlan port id
905 :return:
906 """
907 if self.test:
908 return
909 self.create_linux_bridge(vlan)
910 self.add_port_to_ovs_bridge(vlan)
911
912 def create_linux_bridge(self, vlan):
913 """
914 Create a linux bridge with STP active
915 :param vlan: netowrk vlan id
916 :return:
917 """
918
919 if self.test:
920 return
921
922 port_name = 'ovim-' + vlan
923 command = 'sudo brctl show | grep ' + port_name
924 print self.name, ': command:', command
925 (_, stdout, _) = self.ssh_conn.exec_command(command)
926 content = stdout.read()
927
928 # if exist nothing to create
929 # if len(content) == 0:
930 # return False
931
932 command = 'sudo brctl addbr ' + port_name
933 print self.name, ': command:', command
934 (_, stdout, _) = self.ssh_conn.exec_command(command)
935 content = stdout.read()
936
937 # if len(content) == 0:
938 # return True
939 # else:
940 # return False
941
942 command = 'sudo brctl stp ' + port_name + ' on'
943 print self.name, ': command:', command
944 (_, stdout, _) = self.ssh_conn.exec_command(command)
945 content = stdout.read()
946
947 # if len(content) == 0:
948 # return True
949 # else:
950 # return False
951 command = 'sudo ip link set dev ' + port_name + ' up'
952 print self.name, ': command:', command
953 (_, stdout, _) = self.ssh_conn.exec_command(command)
954 content = stdout.read()
955
956 if len(content) == 0:
957 return True
958 else:
959 return False
960
961 def set_mac_dhcp_server(self, ip, mac, vlan, netmask, dhcp_path):
962 """
963 Write into dhcp conf file a rule to assigned a fixed ip given to an specific MAC address
964 :param ip: IP address asigned to a VM
965 :param mac: VM vnic mac to be macthed with the IP received
966 :param vlan: Segmentation id
967 :param netmask: netmask value
968 :param path: dhcp conf file path that live in namespace side
969 :return: True if success
970 """
971
972 if self.test:
973 return
974
975 net_namespace = 'ovim-' + vlan
976 dhcp_path = os.path.join(dhcp_path, net_namespace)
977 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
978
979 if not ip:
980 return False
981
982 ip_data = mac.upper() + ',' + ip
983
984 command = 'sudo ip netns exec ' + net_namespace + ' touch ' + dhcp_hostsdir
985 print self.name, ': command:', command
986 (_, stdout, _) = self.ssh_conn.exec_command(command)
987 content = stdout.read()
988
989 command = 'sudo ip netns exec ' + net_namespace + ' sudo bash -ec "echo ' + ip_data + ' >> ' + dhcp_hostsdir + '"'
990
991 print self.name, ': command:', command
992 (_, stdout, _) = self.ssh_conn.exec_command(command)
993 content = stdout.read()
994
995 if len(content) == 0:
996 return True
997 else:
998 return False
999
1000 def delete_mac_dhcp_server(self, ip, mac, vlan, dhcp_path):
1001 """
1002 Delete into dhcp conf file the ip assigned to a specific MAC address
1003
1004 :param ip: IP address asigned to a VM
1005 :param mac: VM vnic mac to be macthed with the IP received
1006 :param vlan: Segmentation id
1007 :param dhcp_path: dhcp conf file path that live in namespace side
1008 :return:
1009 """
1010
1011 if self.test:
1012 return
1013
1014 net_namespace = 'ovim-' + vlan
1015 dhcp_path = os.path.join(dhcp_path, net_namespace)
1016 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
1017
1018 if not ip:
1019 return False
1020
1021 ip_data = mac.upper() + ',' + ip
1022
1023 command = 'sudo ip netns exec ' + net_namespace + ' sudo sed -i \'/' + ip_data + '/d\' ' + dhcp_hostsdir
1024 print self.name, ': command:', command
1025 (_, stdout, _) = self.ssh_conn.exec_command(command)
1026 content = stdout.read()
1027
1028 if len(content) == 0:
1029 return True
1030 else:
1031 return False
1032
1033 def launch_dhcp_server(self, vlan, ip_range, netmask, dhcp_path):
1034 """
1035 Generate a linux bridge and attache the port to a OVS bridge
1036 :param self:
1037 :param vlan: Segmentation id
1038 :param ip_range: IP dhcp range
1039 :param netmask: network netmask
1040 :param dhcp_path: dhcp conf file path that live in namespace side
1041 :return: True if success
1042 """
1043
1044 if self.test:
1045 return
1046
1047 interface = 'tap-' + vlan
1048 net_namespace = 'ovim-' + vlan
1049 dhcp_path = os.path.join(dhcp_path, net_namespace)
1050 leases_path = os.path.join(dhcp_path, "dnsmasq.leases")
1051 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
1052
1053 dhcp_range = ip_range[0] + ',' + ip_range[1] + ',' + netmask
1054
1055 command = 'sudo ip netns exec ' + net_namespace + ' mkdir -p ' + dhcp_path
1056 print self.name, ': command:', command
1057 (_, stdout, _) = self.ssh_conn.exec_command(command)
1058 content = stdout.read()
1059
1060 pid_path = os.path.join(dhcp_path, 'dnsmasq.pid')
1061 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_path
1062 print self.name, ': command:', command
1063 (_, stdout, _) = self.ssh_conn.exec_command(command)
1064 content = stdout.read()
1065 # check if pid is runing
1066 pid_status_path = content
1067 if content:
1068 command = "ps aux | awk '{print $2 }' | grep " + pid_status_path
1069 print self.name, ': command:', command
1070 (_, stdout, _) = self.ssh_conn.exec_command(command)
1071 content = stdout.read()
1072 if not content:
1073 command = 'sudo ip netns exec ' + net_namespace + ' /usr/sbin/dnsmasq --strict-order --except-interface=lo ' \
1074 '--interface=' + interface + ' --bind-interfaces --dhcp-hostsdir=' + dhcp_path + \
1075 ' --dhcp-range ' + dhcp_range + ' --pid-file=' + pid_file + ' --dhcp-leasefile=' + leases_path + ' --listen-address ' + ip_range[0]
1076
1077 print self.name, ': command:', command
1078 (_, stdout, _) = self.ssh_conn.exec_command(command)
1079 content = stdout.readline()
1080
1081 if len(content) == 0:
1082 return True
1083 else:
1084 return False
1085
1086 def delete_dhcp_interfaces(self, vlan):
1087 """
1088 Create a linux bridge with STP active
1089 :param vlan: netowrk vlan id
1090 :return:
1091 """
1092
1093 if self.test:
1094 return
1095
1096 net_namespace = 'ovim-' + vlan
1097 command = 'sudo ovs-vsctl del-port br-int ovs-tap-' + vlan
1098 print self.name, ': command:', command
1099 (_, stdout, _) = self.ssh_conn.exec_command(command)
1100 content = stdout.read()
1101
1102 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + vlan + ' down'
1103 print self.name, ': command:', command
1104 (_, stdout, _) = self.ssh_conn.exec_command(command)
1105 content = stdout.read()
1106
1107 command = 'sudo ip link set dev ovs-tap-' + vlan + ' down'
1108 print self.name, ': command:', command
1109 (_, stdout, _) = self.ssh_conn.exec_command(command)
1110 content = stdout.read()
1111
1112 def create_dhcp_interfaces(self, vlan, ip, netmask):
1113 """
1114 Create a linux bridge with STP active
1115 :param vlan: segmentation id
1116 :param ip: Ip included in the dhcp range for the tap interface living in namesapce side
1117 :param netmask: dhcp net CIDR
1118 :return: True if success
1119 """
1120
1121 if self.test:
1122 return
1123
1124 net_namespace = 'ovim-' + vlan
1125 namespace_interface = 'tap-' + vlan
1126
1127 command = 'sudo ip netns add ' + net_namespace
1128 print self.name, ': command:', command
1129 (_, stdout, _) = self.ssh_conn.exec_command(command)
1130 content = stdout.read()
1131
1132 command = 'sudo ip link add tap-' + vlan + ' type veth peer name ovs-tap-' + vlan
1133 print self.name, ': command:', command
1134 (_, stdout, _) = self.ssh_conn.exec_command(command)
1135 content = stdout.read()
1136
1137 command = 'sudo ovs-vsctl add-port br-int ovs-tap-' + vlan + ' tag=' + vlan
1138 print self.name, ': command:', command
1139 (_, stdout, _) = self.ssh_conn.exec_command(command)
1140 content = stdout.read()
1141
1142 command = 'sudo ip link set tap-' + vlan + ' netns ' + net_namespace
1143 print self.name, ': command:', command
1144 (_, stdout, _) = self.ssh_conn.exec_command(command)
1145 content = stdout.read()
1146
1147 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + vlan + ' up'
1148 print self.name, ': command:', command
1149 (_, stdout, _) = self.ssh_conn.exec_command(command)
1150 content = stdout.read()
1151
1152 command = 'sudo ip link set dev ovs-tap-' + vlan + ' up'
1153 print self.name, ': command:', command
1154 (_, stdout, _) = self.ssh_conn.exec_command(command)
1155 content = stdout.read()
1156
1157 command = 'sudo ip netns exec ' + net_namespace + ' ' + ' ifconfig ' + namespace_interface \
1158 + ' ' + ip + ' netmask ' + netmask
1159 print self.name, ': command:', command
1160 (_, stdout, _) = self.ssh_conn.exec_command(command)
1161 content = stdout.read()
1162
1163 if len(content) == 0:
1164 return True
1165 else:
1166 return False
1167
1168 def create_ovs_vxlan_tunnel(self, vxlan_interface, remote_ip):
1169 """
1170 Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level
1171 :param vxlan_interface: vlxan inteface name.
1172 :param remote_ip: tunnel endpoint remote compute ip.
1173 :return:
1174 """
1175 if self.test:
1176 return
1177 command = 'sudo ovs-vsctl add-port br-int ' + vxlan_interface + \
1178 ' -- set Interface ' + vxlan_interface + ' type=vxlan options:remote_ip=' + remote_ip + \
1179 ' -- set Port ' + vxlan_interface + ' other_config:stp-path-cost=10'
1180 print self.name, ': command:', command
1181 (_, stdout, _) = self.ssh_conn.exec_command(command)
1182 content = stdout.read()
1183 print content
1184 if len(content) == 0:
1185 return True
1186 else:
1187 return False
1188
1189 def delete_ovs_vxlan_tunnel(self, vxlan_interface):
1190 """
1191 Delete a vlxan tunnel port from a OVS brdige.
1192 :param vxlan_interface: vlxan name to be delete it.
1193 :return: True if success.
1194 """
1195 if self.test:
1196 return
1197 command = 'sudo ovs-vsctl del-port br-int ' + vxlan_interface
1198 print self.name, ': command:', command
1199 (_, stdout, _) = self.ssh_conn.exec_command(command)
1200 content = stdout.read()
1201 print content
1202 if len(content) == 0:
1203 return True
1204 else:
1205 return False
1206
1207 def delete_ovs_bridge(self):
1208 """
1209 Delete a OVS bridge from a compute.
1210 :return: True if success
1211 """
1212 if self.test:
1213 return
1214 command = 'sudo ovs-vsctl del-br br-int'
1215 print self.name, ': command:', command
1216 (_, stdout, _) = self.ssh_conn.exec_command(command)
1217 content = stdout.read()
1218 if len(content) == 0:
1219 return True
1220 else:
1221 return False
1222
1223 def get_file_info(self, path):
1224 command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
1225 print self.name, ': command:', command
1226 (_, stdout, _) = self.ssh_conn.exec_command(command)
1227 content = stdout.read()
1228 if len(content) == 0:
1229 return None # file does not exist
1230 else:
1231 return content.split(" ") #(permission, 1, owner, group, size, date, file)
1232
1233 def qemu_get_info(self, path):
1234 command = 'qemu-img info ' + path
1235 print self.name, ': command:', command
1236 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
1237 content = stdout.read()
1238 if len(content) == 0:
1239 error = stderr.read()
1240 print self.name, ": get_qemu_info error ", error
1241 raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error)
1242 else:
1243 try:
1244 return yaml.load(content)
1245 except yaml.YAMLError as exc:
1246 text = ""
1247 if hasattr(exc, 'problem_mark'):
1248 mark = exc.problem_mark
1249 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
1250 print self.name, ": get_qemu_info yaml format Exception", text
1251 raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text)
1252
1253 def qemu_change_backing(self, inc_file, new_backing_file):
1254 command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file
1255 print self.name, ': command:', command
1256 (_, _, stderr) = self.ssh_conn.exec_command(command)
1257 content = stderr.read()
1258 if len(content) == 0:
1259 return 0
1260 else:
1261 print self.name, ": qemu_change_backing error: ", content
1262 return -1
1263
1264 def get_notused_filename(self, proposed_name, suffix=''):
1265 '''Look for a non existing file_name in the host
1266 proposed_name: proposed file name, includes path
1267 suffix: suffix to be added to the name, before the extention
1268 '''
1269 extension = proposed_name.rfind(".")
1270 slash = proposed_name.rfind("/")
1271 if extension < 0 or extension < slash: # no extension
1272 extension = len(proposed_name)
1273 target_name = proposed_name[:extension] + suffix + proposed_name[extension:]
1274 info = self.get_file_info(target_name)
1275 if info is None:
1276 return target_name
1277
1278 index=0
1279 while info is not None:
1280 target_name = proposed_name[:extension] + suffix + "-" + str(index) + proposed_name[extension:]
1281 index+=1
1282 info = self.get_file_info(target_name)
1283 return target_name
1284
1285 def get_notused_path(self, proposed_path, suffix=''):
1286 '''Look for a non existing path at database for images
1287 proposed_path: proposed file name, includes path
1288 suffix: suffix to be added to the name, before the extention
1289 '''
1290 extension = proposed_path.rfind(".")
1291 if extension < 0:
1292 extension = len(proposed_path)
1293 if suffix != None:
1294 target_path = proposed_path[:extension] + suffix + proposed_path[extension:]
1295 index=0
1296 while True:
1297 r,_=self.db.get_table(FROM="images",WHERE={"path":target_path})
1298 if r<=0:
1299 return target_path
1300 target_path = proposed_path[:extension] + suffix + "-" + str(index) + proposed_path[extension:]
1301 index+=1
1302
1303
1304 def delete_file(self, file_name):
1305 command = 'rm -f '+file_name
1306 print self.name, ': command:', command
1307 (_, _, stderr) = self.ssh_conn.exec_command(command)
1308 error_msg = stderr.read()
1309 if len(error_msg) > 0:
1310 raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg)
1311
1312 def copy_file(self, source, destination, perserve_time=True):
1313 if source[0:4]=="http":
1314 command = "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
1315 dst=destination, src=source, dst_result=destination + ".result" )
1316 else:
1317 command = 'cp --no-preserve=mode'
1318 if perserve_time:
1319 command += ' --preserve=timestamps'
1320 command += " '{}' '{}'".format(source, destination)
1321 print self.name, ': command:', command
1322 (_, _, stderr) = self.ssh_conn.exec_command(command)
1323 error_msg = stderr.read()
1324 if len(error_msg) > 0:
1325 raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg)
1326
1327 def copy_remote_file(self, remote_file, use_incremental):
1328 ''' Copy a file from the repository to local folder and recursively
1329 copy the backing files in case the remote file is incremental
1330 Read and/or modified self.localinfo['files'] that contain the
1331 unmodified copies of images in the local path
1332 params:
1333 remote_file: path of remote file
1334 use_incremental: None (leave the decision to this function), True, False
1335 return:
1336 local_file: name of local file
1337 qemu_info: dict with quemu information of local file
1338 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
1339 '''
1340
1341 use_incremental_out = use_incremental
1342 new_backing_file = None
1343 local_file = None
1344 file_from_local = True
1345
1346 #in case incremental use is not decided, take the decision depending on the image
1347 #avoid the use of incremental if this image is already incremental
1348 if remote_file[0:4] == "http":
1349 file_from_local = False
1350 if file_from_local:
1351 qemu_remote_info = self.qemu_get_info(remote_file)
1352 if use_incremental_out==None:
1353 use_incremental_out = not ( file_from_local and 'backing file' in qemu_remote_info)
1354 #copy recursivelly the backing files
1355 if file_from_local and 'backing file' in qemu_remote_info:
1356 new_backing_file, _, _ = self.copy_remote_file(qemu_remote_info['backing file'], True)
1357
1358 #check if remote file is present locally
1359 if use_incremental_out and remote_file in self.localinfo['files']:
1360 local_file = self.localinfo['files'][remote_file]
1361 local_file_info = self.get_file_info(local_file)
1362 if file_from_local:
1363 remote_file_info = self.get_file_info(remote_file)
1364 if local_file_info == None:
1365 local_file = None
1366 elif file_from_local and (local_file_info[4]!=remote_file_info[4] or local_file_info[5]!=remote_file_info[5]):
1367 #local copy of file not valid because date or size are different.
1368 #TODO DELETE local file if this file is not used by any active virtual machine
1369 try:
1370 self.delete_file(local_file)
1371 del self.localinfo['files'][remote_file]
1372 except Exception:
1373 pass
1374 local_file = None
1375 else: #check that the local file has the same backing file, or there are not backing at all
1376 qemu_info = self.qemu_get_info(local_file)
1377 if new_backing_file != qemu_info.get('backing file'):
1378 local_file = None
1379
1380
1381 if local_file == None: #copy the file
1382 img_name= remote_file.split('/') [-1]
1383 img_local = self.image_path + '/' + img_name
1384 local_file = self.get_notused_filename(img_local)
1385 self.copy_file(remote_file, local_file, use_incremental_out)
1386
1387 if use_incremental_out:
1388 self.localinfo['files'][remote_file] = local_file
1389 if new_backing_file:
1390 self.qemu_change_backing(local_file, new_backing_file)
1391 qemu_info = self.qemu_get_info(local_file)
1392
1393 return local_file, qemu_info, use_incremental_out
1394
1395 def launch_server(self, conn, server, rebuild=False, domain=None):
1396 if self.test:
1397 time.sleep(random.randint(20,150)) #sleep random timeto be make it a bit more real
1398 return 0, 'Success'
1399
1400 server_id = server['uuid']
1401 paused = server.get('paused','no')
1402 try:
1403 if domain!=None and rebuild==False:
1404 domain.resume()
1405 #self.server_status[server_id] = 'ACTIVE'
1406 return 0, 'Success'
1407
1408 self.db_lock.acquire()
1409 result, server_data = self.db.get_instance(server_id)
1410 self.db_lock.release()
1411 if result <= 0:
1412 print self.name, ": launch_server ERROR getting server from DB",result, server_data
1413 return result, server_data
1414
1415 #0: get image metadata
1416 server_metadata = server.get('metadata', {})
1417 use_incremental = None
1418
1419 if "use_incremental" in server_metadata:
1420 use_incremental = False if server_metadata["use_incremental"]=="no" else True
1421
1422 server_host_files = self.localinfo['server_files'].get( server['uuid'], {})
1423 if rebuild:
1424 #delete previous incremental files
1425 for file_ in server_host_files.values():
1426 self.delete_file(file_['source file'] )
1427 server_host_files={}
1428
1429 #1: obtain aditional devices (disks)
1430 #Put as first device the main disk
1431 devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ]
1432 if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']:
1433 devices += server_data['extended']['devices']
1434
1435 for dev in devices:
1436 if dev['image_id'] == None:
1437 continue
1438
1439 self.db_lock.acquire()
1440 result, content = self.db.get_table(FROM='images', SELECT=('path', 'metadata'),
1441 WHERE={'uuid': dev['image_id']})
1442 self.db_lock.release()
1443 if result <= 0:
1444 error_text = "ERROR", result, content, "when getting image", dev['image_id']
1445 print self.name, ": launch_server", error_text
1446 return -1, error_text
1447 if content[0]['metadata'] is not None:
1448 dev['metadata'] = json.loads(content[0]['metadata'])
1449 else:
1450 dev['metadata'] = {}
1451
1452 if dev['image_id'] in server_host_files:
1453 dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path
1454 dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2
1455 continue
1456
1457 #2: copy image to host
1458 remote_file = content[0]['path']
1459 use_incremental_image = use_incremental
1460 if dev['metadata'].get("use_incremental") == "no":
1461 use_incremental_image = False
1462 local_file, qemu_info, use_incremental_image = self.copy_remote_file(remote_file, use_incremental_image)
1463
1464 #create incremental image
1465 if use_incremental_image:
1466 local_file_inc = self.get_notused_filename(local_file, '.inc')
1467 command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file
1468 print 'command:', command
1469 (_, _, stderr) = self.ssh_conn.exec_command(command)
1470 error_msg = stderr.read()
1471 if len(error_msg) > 0:
1472 raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg)
1473 local_file = local_file_inc
1474 qemu_info = {'file format':'qcow2'}
1475
1476 server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']}
1477
1478 dev['source file'] = local_file
1479 dev['file format'] = qemu_info['file format']
1480
1481 self.localinfo['server_files'][ server['uuid'] ] = server_host_files
1482 self.localinfo_dirty = True
1483
1484 #3 Create XML
1485 result, xml = self.create_xml_server(server_data, devices, server_metadata) #local_file
1486 if result <0:
1487 print self.name, ": create xml server error:", xml
1488 return -2, xml
1489 print self.name, ": create xml:", xml
1490 atribute = host_thread.lvirt_module.VIR_DOMAIN_START_PAUSED if paused == "yes" else 0
1491 #4 Start the domain
1492 if not rebuild: #ensures that any pending destroying server is done
1493 self.server_forceoff(True)
1494 #print self.name, ": launching instance" #, xml
1495 conn.createXML(xml, atribute)
1496 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
1497
1498 return 0, 'Success'
1499
1500 except paramiko.ssh_exception.SSHException as e:
1501 text = e.args[0]
1502 print self.name, ": launch_server(%s) ssh Exception: %s" %(server_id, text)
1503 if "SSH session not active" in text:
1504 self.ssh_connect()
1505 except host_thread.lvirt_module.libvirtError as e:
1506 text = e.get_error_message()
1507 print self.name, ": launch_server(%s) libvirt Exception: %s" %(server_id, text)
1508 except Exception as e:
1509 text = str(e)
1510 print self.name, ": launch_server(%s) Exception: %s" %(server_id, text)
1511 return -1, text
1512
1513 def update_servers_status(self):
1514 # # virDomainState
1515 # VIR_DOMAIN_NOSTATE = 0
1516 # VIR_DOMAIN_RUNNING = 1
1517 # VIR_DOMAIN_BLOCKED = 2
1518 # VIR_DOMAIN_PAUSED = 3
1519 # VIR_DOMAIN_SHUTDOWN = 4
1520 # VIR_DOMAIN_SHUTOFF = 5
1521 # VIR_DOMAIN_CRASHED = 6
1522 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
1523
1524 if self.test or len(self.server_status)==0:
1525 return
1526
1527 try:
1528 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1529 domains= conn.listAllDomains()
1530 domain_dict={}
1531 for domain in domains:
1532 uuid = domain.UUIDString() ;
1533 libvirt_status = domain.state()
1534 #print libvirt_status
1535 if libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_RUNNING or libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTDOWN:
1536 new_status = "ACTIVE"
1537 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_PAUSED:
1538 new_status = "PAUSED"
1539 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTOFF:
1540 new_status = "INACTIVE"
1541 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_CRASHED:
1542 new_status = "ERROR"
1543 else:
1544 new_status = None
1545 domain_dict[uuid] = new_status
1546 conn.close()
1547 except host_thread.lvirt_module.libvirtError as e:
1548 print self.name, ": get_state() Exception '", e.get_error_message()
1549 return
1550
1551 for server_id, current_status in self.server_status.iteritems():
1552 new_status = None
1553 if server_id in domain_dict:
1554 new_status = domain_dict[server_id]
1555 else:
1556 new_status = "INACTIVE"
1557
1558 if new_status == None or new_status == current_status:
1559 continue
1560 if new_status == 'INACTIVE' and current_status == 'ERROR':
1561 continue #keep ERROR status, because obviously this machine is not running
1562 #change status
1563 print self.name, ": server ", server_id, "status change from ", current_status, "to", new_status
1564 STATUS={'progress':100, 'status':new_status}
1565 if new_status == 'ERROR':
1566 STATUS['last_error'] = 'machine has crashed'
1567 self.db_lock.acquire()
1568 r,_ = self.db.update_rows('instances', STATUS, {'uuid':server_id}, log=False)
1569 self.db_lock.release()
1570 if r>=0:
1571 self.server_status[server_id] = new_status
1572
1573 def action_on_server(self, req, last_retry=True):
1574 '''Perform an action on a req
1575 Attributes:
1576 req: dictionary that contain:
1577 server properties: 'uuid','name','tenant_id','status'
1578 action: 'action'
1579 host properties: 'user', 'ip_name'
1580 return (error, text)
1581 0: No error. VM is updated to new state,
1582 -1: Invalid action, as trying to pause a PAUSED VM
1583 -2: Error accessing host
1584 -3: VM nor present
1585 -4: Error at DB access
1586 -5: Error while trying to perform action. VM is updated to ERROR
1587 '''
1588 server_id = req['uuid']
1589 conn = None
1590 new_status = None
1591 old_status = req['status']
1592 last_error = None
1593
1594 if self.test:
1595 if 'terminate' in req['action']:
1596 new_status = 'deleted'
1597 elif 'shutoff' in req['action'] or 'shutdown' in req['action'] or 'forceOff' in req['action']:
1598 if req['status']!='ERROR':
1599 time.sleep(5)
1600 new_status = 'INACTIVE'
1601 elif 'start' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1602 elif 'resume' in req['action'] and req['status']!='ERROR' and req['status']!='INACTIVE' : new_status = 'ACTIVE'
1603 elif 'pause' in req['action'] and req['status']!='ERROR': new_status = 'PAUSED'
1604 elif 'reboot' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1605 elif 'rebuild' in req['action']:
1606 time.sleep(random.randint(20,150))
1607 new_status = 'ACTIVE'
1608 elif 'createImage' in req['action']:
1609 time.sleep(5)
1610 self.create_image(None, req)
1611 else:
1612 try:
1613 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1614 try:
1615 dom = conn.lookupByUUIDString(server_id)
1616 except host_thread.lvirt_module.libvirtError as e:
1617 text = e.get_error_message()
1618 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1619 dom = None
1620 else:
1621 print self.name, ": action_on_server(",server_id,") libvirt exception:", text
1622 raise e
1623
1624 if 'forceOff' in req['action']:
1625 if dom == None:
1626 print self.name, ": action_on_server(",server_id,") domain not running"
1627 else:
1628 try:
1629 print self.name, ": sending DESTROY to server", server_id
1630 dom.destroy()
1631 except Exception as e:
1632 if "domain is not running" not in e.get_error_message():
1633 print self.name, ": action_on_server(",server_id,") Exception while sending force off:", e.get_error_message()
1634 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1635 new_status = 'ERROR'
1636
1637 elif 'terminate' in req['action']:
1638 if dom == None:
1639 print self.name, ": action_on_server(",server_id,") domain not running"
1640 new_status = 'deleted'
1641 else:
1642 try:
1643 if req['action']['terminate'] == 'force':
1644 print self.name, ": sending DESTROY to server", server_id
1645 dom.destroy()
1646 new_status = 'deleted'
1647 else:
1648 print self.name, ": sending SHUTDOWN to server", server_id
1649 dom.shutdown()
1650 self.pending_terminate_server.append( (time.time()+10,server_id) )
1651 except Exception as e:
1652 print self.name, ": action_on_server(",server_id,") Exception while destroy:", e.get_error_message()
1653 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1654 new_status = 'ERROR'
1655 if "domain is not running" in e.get_error_message():
1656 try:
1657 dom.undefine()
1658 new_status = 'deleted'
1659 except Exception:
1660 print self.name, ": action_on_server(",server_id,") Exception while undefine:", e.get_error_message()
1661 last_error = 'action_on_server Exception2 while undefine:', e.get_error_message()
1662 #Exception: 'virDomainDetachDevice() failed'
1663 if new_status=='deleted':
1664 if server_id in self.server_status:
1665 del self.server_status[server_id]
1666 if req['uuid'] in self.localinfo['server_files']:
1667 for file_ in self.localinfo['server_files'][ req['uuid'] ].values():
1668 try:
1669 self.delete_file(file_['source file'])
1670 except Exception:
1671 pass
1672 del self.localinfo['server_files'][ req['uuid'] ]
1673 self.localinfo_dirty = True
1674
1675 elif 'shutoff' in req['action'] or 'shutdown' in req['action']:
1676 try:
1677 if dom == None:
1678 print self.name, ": action_on_server(",server_id,") domain not running"
1679 else:
1680 dom.shutdown()
1681 # new_status = 'INACTIVE'
1682 #TODO: check status for changing at database
1683 except Exception as e:
1684 new_status = 'ERROR'
1685 print self.name, ": action_on_server(",server_id,") Exception while shutdown:", e.get_error_message()
1686 last_error = 'action_on_server Exception while shutdown: ' + e.get_error_message()
1687
1688 elif 'rebuild' in req['action']:
1689 if dom != None:
1690 dom.destroy()
1691 r = self.launch_server(conn, req, True, None)
1692 if r[0] <0:
1693 new_status = 'ERROR'
1694 last_error = r[1]
1695 else:
1696 new_status = 'ACTIVE'
1697 elif 'start' in req['action']:
1698 # The instance is only create in DB but not yet at libvirt domain, needs to be create
1699 rebuild = True if req['action']['start'] == 'rebuild' else False
1700 r = self.launch_server(conn, req, rebuild, dom)
1701 if r[0] <0:
1702 new_status = 'ERROR'
1703 last_error = r[1]
1704 else:
1705 new_status = 'ACTIVE'
1706
1707 elif 'resume' in req['action']:
1708 try:
1709 if dom == None:
1710 pass
1711 else:
1712 dom.resume()
1713 # new_status = 'ACTIVE'
1714 except Exception as e:
1715 print self.name, ": action_on_server(",server_id,") Exception while resume:", e.get_error_message()
1716
1717 elif 'pause' in req['action']:
1718 try:
1719 if dom == None:
1720 pass
1721 else:
1722 dom.suspend()
1723 # new_status = 'PAUSED'
1724 except Exception as e:
1725 print self.name, ": action_on_server(",server_id,") Exception while pause:", e.get_error_message()
1726
1727 elif 'reboot' in req['action']:
1728 try:
1729 if dom == None:
1730 pass
1731 else:
1732 dom.reboot()
1733 print self.name, ": action_on_server(",server_id,") reboot:"
1734 #new_status = 'ACTIVE'
1735 except Exception as e:
1736 print self.name, ": action_on_server(",server_id,") Exception while reboot:", e.get_error_message()
1737 elif 'createImage' in req['action']:
1738 self.create_image(dom, req)
1739
1740
1741 conn.close()
1742 except host_thread.lvirt_module.libvirtError as e:
1743 if conn is not None: conn.close()
1744 text = e.get_error_message()
1745 new_status = "ERROR"
1746 last_error = text
1747 print self.name, ": action_on_server(",server_id,") Exception '", text
1748 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1749 print self.name, ": action_on_server(",server_id,") Exception removed from host"
1750 #end of if self.test
1751 if new_status == None:
1752 return 1
1753
1754 print self.name, ": action_on_server(",server_id,") new status", new_status, last_error
1755 UPDATE = {'progress':100, 'status':new_status}
1756
1757 if new_status=='ERROR':
1758 if not last_retry: #if there will be another retry do not update database
1759 return -1
1760 elif 'terminate' in req['action']:
1761 #PUT a log in the database
1762 print self.name, ": PANIC deleting server", server_id, last_error
1763 self.db_lock.acquire()
1764 self.db.new_row('logs',
1765 {'uuid':server_id, 'tenant_id':req['tenant_id'], 'related':'instances','level':'panic',
1766 'description':'PANIC deleting server from host '+self.name+': '+last_error}
1767 )
1768 self.db_lock.release()
1769 if server_id in self.server_status:
1770 del self.server_status[server_id]
1771 return -1
1772 else:
1773 UPDATE['last_error'] = last_error
1774 if new_status != 'deleted' and (new_status != old_status or new_status == 'ERROR') :
1775 self.db_lock.acquire()
1776 self.db.update_rows('instances', UPDATE, {'uuid':server_id}, log=True)
1777 self.server_status[server_id] = new_status
1778 self.db_lock.release()
1779 if new_status == 'ERROR':
1780 return -1
1781 return 1
1782
1783
1784 def restore_iface(self, name, mac, lib_conn=None):
1785 ''' make an ifdown, ifup to restore default parameter of na interface
1786 Params:
1787 mac: mac address of the interface
1788 lib_conn: connection to the libvirt, if None a new connection is created
1789 Return 0,None if ok, -1,text if fails
1790 '''
1791 conn=None
1792 ret = 0
1793 error_text=None
1794 if self.test:
1795 print self.name, ": restore_iface '%s' %s" % (name, mac)
1796 return 0, None
1797 try:
1798 if not lib_conn:
1799 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1800 else:
1801 conn = lib_conn
1802
1803 #wait to the pending VM deletion
1804 #TODO.Revise self.server_forceoff(True)
1805
1806 iface = conn.interfaceLookupByMACString(mac)
1807 iface.destroy()
1808 iface.create()
1809 print self.name, ": restore_iface '%s' %s" % (name, mac)
1810 except host_thread.lvirt_module.libvirtError as e:
1811 error_text = e.get_error_message()
1812 print self.name, ": restore_iface '%s' '%s' libvirt exception: %s" %(name, mac, error_text)
1813 ret=-1
1814 finally:
1815 if lib_conn is None and conn is not None:
1816 conn.close()
1817 return ret, error_text
1818
1819
1820 def create_image(self,dom, req):
1821 if self.test:
1822 if 'path' in req['action']['createImage']:
1823 file_dst = req['action']['createImage']['path']
1824 else:
1825 createImage=req['action']['createImage']
1826 img_name= createImage['source']['path']
1827 index=img_name.rfind('/')
1828 file_dst = self.get_notused_path(img_name[:index+1] + createImage['name'] + '.qcow2')
1829 image_status='ACTIVE'
1830 else:
1831 for retry in (0,1):
1832 try:
1833 server_id = req['uuid']
1834 createImage=req['action']['createImage']
1835 file_orig = self.localinfo['server_files'][server_id] [ createImage['source']['image_id'] ] ['source file']
1836 if 'path' in req['action']['createImage']:
1837 file_dst = req['action']['createImage']['path']
1838 else:
1839 img_name= createImage['source']['path']
1840 index=img_name.rfind('/')
1841 file_dst = self.get_notused_filename(img_name[:index+1] + createImage['name'] + '.qcow2')
1842
1843 self.copy_file(file_orig, file_dst)
1844 qemu_info = self.qemu_get_info(file_orig)
1845 if 'backing file' in qemu_info:
1846 for k,v in self.localinfo['files'].items():
1847 if v==qemu_info['backing file']:
1848 self.qemu_change_backing(file_dst, k)
1849 break
1850 image_status='ACTIVE'
1851 break
1852 except paramiko.ssh_exception.SSHException as e:
1853 image_status='ERROR'
1854 error_text = e.args[0]
1855 print self.name, "': create_image(",server_id,") ssh Exception:", error_text
1856 if "SSH session not active" in error_text and retry==0:
1857 self.ssh_connect()
1858 except Exception as e:
1859 image_status='ERROR'
1860 error_text = str(e)
1861 print self.name, "': create_image(",server_id,") Exception:", error_text
1862
1863 #TODO insert a last_error at database
1864 self.db_lock.acquire()
1865 self.db.update_rows('images', {'status':image_status, 'progress': 100, 'path':file_dst},
1866 {'uuid':req['new_image']['uuid']}, log=True)
1867 self.db_lock.release()
1868
1869 def edit_iface(self, port_id, old_net, new_net):
1870 #This action imply remove and insert interface to put proper parameters
1871 if self.test:
1872 time.sleep(1)
1873 else:
1874 #get iface details
1875 self.db_lock.acquire()
1876 r,c = self.db.get_table(FROM='ports as p join resources_port as rp on p.uuid=rp.port_id',
1877 WHERE={'port_id': port_id})
1878 self.db_lock.release()
1879 if r<0:
1880 print self.name, ": edit_iface(",port_id,") DDBB error:", c
1881 return
1882 elif r==0:
1883 print self.name, ": edit_iface(",port_id,") por not found"
1884 return
1885 port=c[0]
1886 if port["model"]!="VF":
1887 print self.name, ": edit_iface(",port_id,") ERROR model must be VF"
1888 return
1889 #create xml detach file
1890 xml=[]
1891 self.xml_level = 2
1892 xml.append("<interface type='hostdev' managed='yes'>")
1893 xml.append(" <mac address='" +port['mac']+ "'/>")
1894 xml.append(" <source>"+ self.pci2xml(port['pci'])+"\n </source>")
1895 xml.append('</interface>')
1896
1897
1898 try:
1899 conn=None
1900 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1901 dom = conn.lookupByUUIDString(port["instance_id"])
1902 if old_net:
1903 text="\n".join(xml)
1904 print self.name, ": edit_iface detaching SRIOV interface", text
1905 dom.detachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
1906 if new_net:
1907 xml[-1] =" <vlan> <tag id='" + str(port['vlan']) + "'/> </vlan>"
1908 self.xml_level = 1
1909 xml.append(self.pci2xml(port.get('vpci',None)) )
1910 xml.append('</interface>')
1911 text="\n".join(xml)
1912 print self.name, ": edit_iface attaching SRIOV interface", text
1913 dom.attachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
1914
1915 except host_thread.lvirt_module.libvirtError as e:
1916 text = e.get_error_message()
1917 print self.name, ": edit_iface(",port["instance_id"],") libvirt exception:", text
1918
1919 finally:
1920 if conn is not None: conn.close()
1921
1922
1923 def create_server(server, db, db_lock, only_of_ports):
1924 #print "server"
1925 #print "server"
1926 #print server
1927 #print "server"
1928 #print "server"
1929 #try:
1930 # host_id = server.get('host_id', None)
1931 extended = server.get('extended', None)
1932
1933 # print '----------------------'
1934 # print json.dumps(extended, indent=4)
1935
1936 requirements={}
1937 requirements['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
1938 requirements['ram'] = server['flavor'].get('ram', 0)
1939 if requirements['ram']== None:
1940 requirements['ram'] = 0
1941 requirements['vcpus'] = server['flavor'].get('vcpus', 0)
1942 if requirements['vcpus']== None:
1943 requirements['vcpus'] = 0
1944 #If extended is not defined get requirements from flavor
1945 if extended is None:
1946 #If extended is defined in flavor convert to dictionary and use it
1947 if 'extended' in server['flavor'] and server['flavor']['extended'] != None:
1948 json_acceptable_string = server['flavor']['extended'].replace("'", "\"")
1949 extended = json.loads(json_acceptable_string)
1950 else:
1951 extended = None
1952 #print json.dumps(extended, indent=4)
1953
1954 #For simplicity only one numa VM are supported in the initial implementation
1955 if extended != None:
1956 numas = extended.get('numas', [])
1957 if len(numas)>1:
1958 return (-2, "Multi-NUMA VMs are not supported yet")
1959 #elif len(numas)<1:
1960 # return (-1, "At least one numa must be specified")
1961
1962 #a for loop is used in order to be ready to multi-NUMA VMs
1963 request = []
1964 for numa in numas:
1965 numa_req = {}
1966 numa_req['memory'] = numa.get('memory', 0)
1967 if 'cores' in numa:
1968 numa_req['proc_req_nb'] = numa['cores'] #number of cores or threads to be reserved
1969 numa_req['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
1970 numa_req['proc_req_list'] = numa.get('cores-id', None) #list of ids to be assigned to the cores or threads
1971 elif 'paired-threads' in numa:
1972 numa_req['proc_req_nb'] = numa['paired-threads']
1973 numa_req['proc_req_type'] = 'paired-threads'
1974 numa_req['proc_req_list'] = numa.get('paired-threads-id', None)
1975 elif 'threads' in numa:
1976 numa_req['proc_req_nb'] = numa['threads']
1977 numa_req['proc_req_type'] = 'threads'
1978 numa_req['proc_req_list'] = numa.get('threads-id', None)
1979 else:
1980 numa_req['proc_req_nb'] = 0 # by default
1981 numa_req['proc_req_type'] = 'threads'
1982
1983
1984
1985 #Generate a list of sriov and another for physical interfaces
1986 interfaces = numa.get('interfaces', [])
1987 sriov_list = []
1988 port_list = []
1989 for iface in interfaces:
1990 iface['bandwidth'] = int(iface['bandwidth'])
1991 if iface['dedicated'][:3]=='yes':
1992 port_list.append(iface)
1993 else:
1994 sriov_list.append(iface)
1995
1996 #Save lists ordered from more restrictive to less bw requirements
1997 numa_req['sriov_list'] = sorted(sriov_list, key=lambda k: k['bandwidth'], reverse=True)
1998 numa_req['port_list'] = sorted(port_list, key=lambda k: k['bandwidth'], reverse=True)
1999
2000
2001 request.append(numa_req)
2002
2003 # print "----------\n"+json.dumps(request[0], indent=4)
2004 # print '----------\n\n'
2005
2006 #Search in db for an appropriate numa for each requested numa
2007 #at the moment multi-NUMA VMs are not supported
2008 if len(request)>0:
2009 requirements['numa'].update(request[0])
2010 if requirements['numa']['memory']>0:
2011 requirements['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
2012 elif requirements['ram']==0:
2013 return (-1, "Memory information not set neither at extended field not at ram")
2014 if requirements['numa']['proc_req_nb']>0:
2015 requirements['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
2016 elif requirements['vcpus']==0:
2017 return (-1, "Processor information not set neither at extended field not at vcpus")
2018
2019
2020 db_lock.acquire()
2021 result, content = db.get_numas(requirements, server.get('host_id', None), only_of_ports)
2022 db_lock.release()
2023
2024 if result == -1:
2025 return (-1, content)
2026
2027 numa_id = content['numa_id']
2028 host_id = content['host_id']
2029
2030 #obtain threads_id and calculate pinning
2031 cpu_pinning = []
2032 reserved_threads=[]
2033 if requirements['numa']['proc_req_nb']>0:
2034 db_lock.acquire()
2035 result, content = db.get_table(FROM='resources_core',
2036 SELECT=('id','core_id','thread_id'),
2037 WHERE={'numa_id':numa_id,'instance_id': None, 'status':'ok'} )
2038 db_lock.release()
2039 if result <= 0:
2040 print content
2041 return -1, content
2042
2043 #convert rows to a dictionary indexed by core_id
2044 cores_dict = {}
2045 for row in content:
2046 if not row['core_id'] in cores_dict:
2047 cores_dict[row['core_id']] = []
2048 cores_dict[row['core_id']].append([row['thread_id'],row['id']])
2049
2050 #In case full cores are requested
2051 paired = 'N'
2052 if requirements['numa']['proc_req_type'] == 'cores':
2053 #Get/create the list of the vcpu_ids
2054 vcpu_id_list = requirements['numa']['proc_req_list']
2055 if vcpu_id_list == None:
2056 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2057
2058 for threads in cores_dict.itervalues():
2059 #we need full cores
2060 if len(threads) != 2:
2061 continue
2062
2063 #set pinning for the first thread
2064 cpu_pinning.append( [ vcpu_id_list.pop(0), threads[0][0], threads[0][1] ] )
2065
2066 #reserve so it is not used the second thread
2067 reserved_threads.append(threads[1][1])
2068
2069 if len(vcpu_id_list) == 0:
2070 break
2071
2072 #In case paired threads are requested
2073 elif requirements['numa']['proc_req_type'] == 'paired-threads':
2074 paired = 'Y'
2075 #Get/create the list of the vcpu_ids
2076 if requirements['numa']['proc_req_list'] != None:
2077 vcpu_id_list = []
2078 for pair in requirements['numa']['proc_req_list']:
2079 if len(pair)!=2:
2080 return -1, "Field paired-threads-id not properly specified"
2081 return
2082 vcpu_id_list.append(pair[0])
2083 vcpu_id_list.append(pair[1])
2084 else:
2085 vcpu_id_list = range(0,2*int(requirements['numa']['proc_req_nb']))
2086
2087 for threads in cores_dict.itervalues():
2088 #we need full cores
2089 if len(threads) != 2:
2090 continue
2091 #set pinning for the first thread
2092 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2093
2094 #set pinning for the second thread
2095 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2096
2097 if len(vcpu_id_list) == 0:
2098 break
2099
2100 #In case normal threads are requested
2101 elif requirements['numa']['proc_req_type'] == 'threads':
2102 #Get/create the list of the vcpu_ids
2103 vcpu_id_list = requirements['numa']['proc_req_list']
2104 if vcpu_id_list == None:
2105 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2106
2107 for threads_index in sorted(cores_dict, key=lambda k: len(cores_dict[k])):
2108 threads = cores_dict[threads_index]
2109 #set pinning for the first thread
2110 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2111
2112 #if exists, set pinning for the second thread
2113 if len(threads) == 2 and len(vcpu_id_list) != 0:
2114 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2115
2116 if len(vcpu_id_list) == 0:
2117 break
2118
2119 #Get the source pci addresses for the selected numa
2120 used_sriov_ports = []
2121 for port in requirements['numa']['sriov_list']:
2122 db_lock.acquire()
2123 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2124 db_lock.release()
2125 if result <= 0:
2126 print content
2127 return -1, content
2128 for row in content:
2129 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2130 continue
2131 port['pci'] = row['pci']
2132 if 'mac_address' not in port:
2133 port['mac_address'] = row['mac']
2134 del port['mac']
2135 port['port_id']=row['id']
2136 port['Mbps_used'] = port['bandwidth']
2137 used_sriov_ports.append(row['id'])
2138 break
2139
2140 for port in requirements['numa']['port_list']:
2141 port['Mbps_used'] = None
2142 if port['dedicated'] != "yes:sriov":
2143 port['mac_address'] = port['mac']
2144 del port['mac']
2145 continue
2146 db_lock.acquire()
2147 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac', 'Mbps'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2148 db_lock.release()
2149 if result <= 0:
2150 print content
2151 return -1, content
2152 port['Mbps_used'] = content[0]['Mbps']
2153 for row in content:
2154 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2155 continue
2156 port['pci'] = row['pci']
2157 if 'mac_address' not in port:
2158 port['mac_address'] = row['mac'] # mac cannot be set to passthrough ports
2159 del port['mac']
2160 port['port_id']=row['id']
2161 used_sriov_ports.append(row['id'])
2162 break
2163
2164 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
2165 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
2166
2167 server['host_id'] = host_id
2168
2169
2170 #Generate dictionary for saving in db the instance resources
2171 resources = {}
2172 resources['bridged-ifaces'] = []
2173
2174 numa_dict = {}
2175 numa_dict['interfaces'] = []
2176
2177 numa_dict['interfaces'] += requirements['numa']['port_list']
2178 numa_dict['interfaces'] += requirements['numa']['sriov_list']
2179
2180 #Check bridge information
2181 unified_dataplane_iface=[]
2182 unified_dataplane_iface += requirements['numa']['port_list']
2183 unified_dataplane_iface += requirements['numa']['sriov_list']
2184
2185 for control_iface in server.get('networks', []):
2186 control_iface['net_id']=control_iface.pop('uuid')
2187 #Get the brifge name
2188 db_lock.acquire()
2189 result, content = db.get_table(FROM='nets',
2190 SELECT=('name', 'type', 'vlan', 'provider', 'enable_dhcp',
2191 'dhcp_first_ip', 'dhcp_last_ip', 'cidr'),
2192 WHERE={'uuid': control_iface['net_id']})
2193 db_lock.release()
2194 if result < 0:
2195 pass
2196 elif result==0:
2197 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface['net_id']
2198 else:
2199 network=content[0]
2200 if control_iface.get("type", 'virtual') == 'virtual':
2201 if network['type']!='bridge_data' and network['type']!='bridge_man':
2202 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface['net_id']
2203 resources['bridged-ifaces'].append(control_iface)
2204 if network.get("provider") and network["provider"][0:3] == "OVS":
2205 control_iface["type"] = "instance:ovs"
2206 else:
2207 control_iface["type"] = "instance:bridge"
2208 if network.get("vlan"):
2209 control_iface["vlan"] = network["vlan"]
2210
2211 if network.get("enable_dhcp") == 'true':
2212 control_iface["enable_dhcp"] = network.get("enable_dhcp")
2213 control_iface["dhcp_first_ip"] = network["dhcp_first_ip"]
2214 control_iface["dhcp_last_ip"] = network["dhcp_last_ip"]
2215 control_iface["cidr"] = network["cidr"]
2216 else:
2217 if network['type']!='data' and network['type']!='ptp':
2218 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface['net_id']
2219 #dataplane interface, look for it in the numa tree and asign this network
2220 iface_found=False
2221 for dataplane_iface in numa_dict['interfaces']:
2222 if dataplane_iface['name'] == control_iface.get("name"):
2223 if (dataplane_iface['dedicated'] == "yes" and control_iface["type"] != "PF") or \
2224 (dataplane_iface['dedicated'] == "no" and control_iface["type"] != "VF") or \
2225 (dataplane_iface['dedicated'] == "yes:sriov" and control_iface["type"] != "VFnotShared") :
2226 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
2227 (control_iface.get("name"), dataplane_iface['dedicated'], control_iface["type"])
2228 dataplane_iface['uuid'] = control_iface['net_id']
2229 if dataplane_iface['dedicated'] == "no":
2230 dataplane_iface['vlan'] = network['vlan']
2231 if dataplane_iface['dedicated'] != "yes" and control_iface.get("mac_address"):
2232 dataplane_iface['mac_address'] = control_iface.get("mac_address")
2233 if control_iface.get("vpci"):
2234 dataplane_iface['vpci'] = control_iface.get("vpci")
2235 iface_found=True
2236 break
2237 if not iface_found:
2238 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface.get("name")
2239
2240 resources['host_id'] = host_id
2241 resources['image_id'] = server['image_id']
2242 resources['flavor_id'] = server['flavor_id']
2243 resources['tenant_id'] = server['tenant_id']
2244 resources['ram'] = requirements['ram']
2245 resources['vcpus'] = requirements['vcpus']
2246 resources['status'] = 'CREATING'
2247
2248 if 'description' in server: resources['description'] = server['description']
2249 if 'name' in server: resources['name'] = server['name']
2250
2251 resources['extended'] = {} #optional
2252 resources['extended']['numas'] = []
2253 numa_dict['numa_id'] = numa_id
2254 numa_dict['memory'] = requirements['numa']['memory']
2255 numa_dict['cores'] = []
2256
2257 for core in cpu_pinning:
2258 numa_dict['cores'].append({'id': core[2], 'vthread': core[0], 'paired': paired})
2259 for core in reserved_threads:
2260 numa_dict['cores'].append({'id': core})
2261 resources['extended']['numas'].append(numa_dict)
2262 if extended!=None and 'devices' in extended: #TODO allow extra devices without numa
2263 resources['extended']['devices'] = extended['devices']
2264
2265
2266 print '===================================={'
2267 print json.dumps(resources, indent=4)
2268 print '====================================}'
2269
2270 return 0, resources
2271