d8bca2e2e1ca13d091a34566d29610672f520cb8
[osm/openvim.git] / osm_openvim / host_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 '''
28 __author__ = "Pablo Montes, Alfonso Tierno, Leonardo Mirabal"
29 __date__ = "$10-jul-2014 12:07:15$"
30
31 import json
32 import yaml
33 import threading
34 import time
35 import Queue
36 import paramiko
37 from jsonschema import validate as js_v, exceptions as js_e
38 #import libvirt
39 import imp
40 from vim_schema import localinfo_schema, hostinfo_schema
41 import random
42 import os
43
44 #TODO: insert a logging system
45
46 # from logging import Logger
47 # import auxiliary_functions as af
48
49 # TODO: insert a logging system
50
51
52 class host_thread(threading.Thread):
53 lvirt_module = None
54
55 def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode,
56 develop_bridge_iface):
57 '''Init a thread.
58 Arguments:
59 'id' number of thead
60 'name' name of thread
61 'host','user': host ip or name to manage and user
62 'db', 'db_lock': database class and lock to use it in exclusion
63 '''
64 threading.Thread.__init__(self)
65 self.name = name
66 self.host = host
67 self.user = user
68 self.db = db
69 self.db_lock = db_lock
70 self.test = test
71
72 if not test and not host_thread.lvirt_module:
73 try:
74 module_info = imp.find_module("libvirt")
75 host_thread.lvirt_module = imp.load_module("libvirt", *module_info)
76 except (IOError, ImportError) as e:
77 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e))
78
79
80 self.develop_mode = develop_mode
81 self.develop_bridge_iface = develop_bridge_iface
82 self.image_path = image_path
83 self.host_id = host_id
84 self.version = version
85
86 self.xml_level = 0
87 #self.pending ={}
88
89 self.server_status = {} #dictionary with pairs server_uuid:server_status
90 self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
91 self.next_update_server_status = 0 #time when must be check servers status
92
93 self.hostinfo = None
94
95 self.queueLock = threading.Lock()
96 self.taskQueue = Queue.Queue(2000)
97 self.ssh_conn = None
98
99 def ssh_connect(self):
100 try:
101 #Connect SSH
102 self.ssh_conn = paramiko.SSHClient()
103 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
104 self.ssh_conn.load_system_host_keys()
105 self.ssh_conn.connect(self.host, username=self.user, timeout=10) #, None)
106 except paramiko.ssh_exception.SSHException as e:
107 text = e.args[0]
108 print self.name, ": ssh_connect ssh Exception:", text
109
110 def load_localinfo(self):
111 if not self.test:
112 try:
113 #Connect SSH
114 self.ssh_connect()
115
116 command = 'mkdir -p ' + self.image_path
117 #print self.name, ': command:', command
118 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
119 content = stderr.read()
120 if len(content) > 0:
121 print self.name, ': command:', command, "stderr:", content
122
123 command = 'cat ' + self.image_path + '/.openvim.yaml'
124 #print self.name, ': command:', command
125 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
126 content = stdout.read()
127 if len(content) == 0:
128 print self.name, ': command:', command, "stderr:", stderr.read()
129 raise paramiko.ssh_exception.SSHException("Error empty file ")
130 self.localinfo = yaml.load(content)
131 js_v(self.localinfo, localinfo_schema)
132 self.localinfo_dirty=False
133 if 'server_files' not in self.localinfo:
134 self.localinfo['server_files'] = {}
135 print self.name, ': localinfo load from host'
136 return
137
138 except paramiko.ssh_exception.SSHException as e:
139 text = e.args[0]
140 print self.name, ": load_localinfo ssh Exception:", text
141 except host_thread.lvirt_module.libvirtError as e:
142 text = e.get_error_message()
143 print self.name, ": load_localinfo libvirt Exception:", text
144 except yaml.YAMLError as exc:
145 text = ""
146 if hasattr(exc, 'problem_mark'):
147 mark = exc.problem_mark
148 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
149 print self.name, ": load_localinfo yaml format Exception", text
150 except js_e.ValidationError as e:
151 text = ""
152 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
153 print self.name, ": load_localinfo format Exception:", text, e.message
154 except Exception as e:
155 text = str(e)
156 print self.name, ": load_localinfo Exception:", text
157
158 #not loaded, insert a default data and force saving by activating dirty flag
159 self.localinfo = {'files':{}, 'server_files':{} }
160 #self.localinfo_dirty=True
161 self.localinfo_dirty=False
162
163 def load_hostinfo(self):
164 if self.test:
165 return;
166 try:
167 #Connect SSH
168 self.ssh_connect()
169
170
171 command = 'cat ' + self.image_path + '/hostinfo.yaml'
172 #print self.name, ': command:', command
173 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
174 content = stdout.read()
175 if len(content) == 0:
176 print self.name, ': command:', command, "stderr:", stderr.read()
177 raise paramiko.ssh_exception.SSHException("Error empty file ")
178 self.hostinfo = yaml.load(content)
179 js_v(self.hostinfo, hostinfo_schema)
180 print self.name, ': hostlinfo load from host', self.hostinfo
181 return
182
183 except paramiko.ssh_exception.SSHException as e:
184 text = e.args[0]
185 print self.name, ": load_hostinfo ssh Exception:", text
186 except host_thread.lvirt_module.libvirtError as e:
187 text = e.get_error_message()
188 print self.name, ": load_hostinfo libvirt Exception:", text
189 except yaml.YAMLError as exc:
190 text = ""
191 if hasattr(exc, 'problem_mark'):
192 mark = exc.problem_mark
193 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
194 print self.name, ": load_hostinfo yaml format Exception", text
195 except js_e.ValidationError as e:
196 text = ""
197 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
198 print self.name, ": load_hostinfo format Exception:", text, e.message
199 except Exception as e:
200 text = str(e)
201 print self.name, ": load_hostinfo Exception:", text
202
203 #not loaded, insert a default data
204 self.hostinfo = None
205
206 def save_localinfo(self, tries=3):
207 if self.test:
208 self.localinfo_dirty = False
209 return
210
211 while tries>=0:
212 tries-=1
213
214 try:
215 command = 'cat > ' + self.image_path + '/.openvim.yaml'
216 print self.name, ': command:', command
217 (stdin, _, _) = self.ssh_conn.exec_command(command)
218 yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True)
219 self.localinfo_dirty = False
220 break #while tries
221
222 except paramiko.ssh_exception.SSHException as e:
223 text = e.args[0]
224 print self.name, ": save_localinfo ssh Exception:", text
225 if "SSH session not active" in text:
226 self.ssh_connect()
227 except host_thread.lvirt_module.libvirtError as e:
228 text = e.get_error_message()
229 print self.name, ": save_localinfo libvirt Exception:", text
230 except yaml.YAMLError as exc:
231 text = ""
232 if hasattr(exc, 'problem_mark'):
233 mark = exc.problem_mark
234 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
235 print self.name, ": save_localinfo yaml format Exception", text
236 except Exception as e:
237 text = str(e)
238 print self.name, ": save_localinfo Exception:", text
239
240 def load_servers_from_db(self):
241 self.db_lock.acquire()
242 r,c = self.db.get_table(SELECT=('uuid','status', 'image_id'), FROM='instances', WHERE={'host_id': self.host_id})
243 self.db_lock.release()
244
245 self.server_status = {}
246 if r<0:
247 print self.name, ": Error getting data from database:", c
248 return
249 for server in c:
250 self.server_status[ server['uuid'] ] = server['status']
251
252 #convert from old version to new one
253 if 'inc_files' in self.localinfo and server['uuid'] in self.localinfo['inc_files']:
254 server_files_dict = {'source file': self.localinfo['inc_files'][ server['uuid'] ] [0], 'file format':'raw' }
255 if server_files_dict['source file'][-5:] == 'qcow2':
256 server_files_dict['file format'] = 'qcow2'
257
258 self.localinfo['server_files'][ server['uuid'] ] = { server['image_id'] : server_files_dict }
259 if 'inc_files' in self.localinfo:
260 del self.localinfo['inc_files']
261 self.localinfo_dirty = True
262
263 def delete_unused_files(self):
264 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
265 Deletes unused entries at self.loacalinfo and the corresponding local files.
266 The only reason for this mismatch is the manual deletion of instances (VM) at database
267 '''
268 if self.test:
269 return
270 for uuid,images in self.localinfo['server_files'].items():
271 if uuid not in self.server_status:
272 for localfile in images.values():
273 try:
274 print self.name, ": deleting file '%s' of unused server '%s'" %(localfile['source file'], uuid)
275 self.delete_file(localfile['source file'])
276 except paramiko.ssh_exception.SSHException as e:
277 print self.name, ": Exception deleting file '%s': %s" %(localfile['source file'], str(e))
278 del self.localinfo['server_files'][uuid]
279 self.localinfo_dirty = True
280
281 def insert_task(self, task, *aditional):
282 try:
283 self.queueLock.acquire()
284 task = self.taskQueue.put( (task,) + aditional, timeout=5)
285 self.queueLock.release()
286 return 1, None
287 except Queue.Full:
288 return -1, "timeout inserting a task over host " + self.name
289
290 def run(self):
291 while True:
292 self.load_localinfo()
293 self.load_hostinfo()
294 self.load_servers_from_db()
295 self.delete_unused_files()
296 while True:
297 self.queueLock.acquire()
298 if not self.taskQueue.empty():
299 task = self.taskQueue.get()
300 else:
301 task = None
302 self.queueLock.release()
303
304 if task is None:
305 now=time.time()
306 if self.localinfo_dirty:
307 self.save_localinfo()
308 elif self.next_update_server_status < now:
309 self.update_servers_status()
310 self.next_update_server_status = now + 5
311 elif len(self.pending_terminate_server)>0 and self.pending_terminate_server[0][0]<now:
312 self.server_forceoff()
313 else:
314 time.sleep(1)
315 continue
316
317 if task[0] == 'instance':
318 print self.name, ": processing task instance", task[1]['action']
319 retry=0
320 while retry <2:
321 retry += 1
322 r=self.action_on_server(task[1], retry==2)
323 if r>=0:
324 break
325 elif task[0] == 'image':
326 pass
327 elif task[0] == 'exit':
328 print self.name, ": processing task exit"
329 self.terminate()
330 return 0
331 elif task[0] == 'reload':
332 print self.name, ": processing task reload terminating and relaunching"
333 self.terminate()
334 break
335 elif task[0] == 'edit-iface':
336 print self.name, ": processing task edit-iface port=%s, old_net=%s, new_net=%s" % (task[1], task[2], task[3])
337 self.edit_iface(task[1], task[2], task[3])
338 elif task[0] == 'restore-iface':
339 print self.name, ": processing task restore-iface %s mac=%s" % (task[1], task[2])
340 self.restore_iface(task[1], task[2])
341 elif task[0] == 'new-ovsbridge':
342 print self.name, ": Creating compute OVS bridge"
343 self.create_ovs_bridge()
344 elif task[0] == 'new-vxlan':
345 print self.name, ": Creating vxlan tunnel=" + task[1] + ", remote ip=" + task[2]
346 self.create_ovs_vxlan_tunnel(task[1], task[2])
347 elif task[0] == 'del-ovsbridge':
348 print self.name, ": Deleting OVS bridge"
349 self.delete_ovs_bridge()
350 elif task[0] == 'del-vxlan':
351 print self.name, ": Deleting vxlan " + task[1] + " tunnel"
352 self.delete_ovs_vxlan_tunnel(task[1])
353 elif task[0] == 'create-ovs-bridge-port':
354 print self.name, ": Adding port ovim-" + task[1] + " to OVS bridge"
355 self.create_ovs_bridge_port(task[1])
356 elif task[0] == 'del-ovs-port':
357 print self.name, ": Delete bridge attached to ovs port vlan {} net {}".format(task[1], task[2])
358 self.delete_bridge_port_attached_to_ovs(task[1], task[2])
359 else:
360 print self.name, ": unknown task", task
361
362 def server_forceoff(self, wait_until_finished=False):
363 while len(self.pending_terminate_server)>0:
364 now = time.time()
365 if self.pending_terminate_server[0][0]>now:
366 if wait_until_finished:
367 time.sleep(1)
368 continue
369 else:
370 return
371 req={'uuid':self.pending_terminate_server[0][1],
372 'action':{'terminate':'force'},
373 'status': None
374 }
375 self.action_on_server(req)
376 self.pending_terminate_server.pop(0)
377
378 def terminate(self):
379 try:
380 self.server_forceoff(True)
381 if self.localinfo_dirty:
382 self.save_localinfo()
383 if not self.test:
384 self.ssh_conn.close()
385 except Exception as e:
386 text = str(e)
387 print self.name, ": terminate Exception:", text
388 print self.name, ": exit from host_thread"
389
390 def get_local_iface_name(self, generic_name):
391 if self.hostinfo != None and "iface_names" in self.hostinfo and generic_name in self.hostinfo["iface_names"]:
392 return self.hostinfo["iface_names"][generic_name]
393 return generic_name
394
395 def create_xml_server(self, server, dev_list, server_metadata={}):
396 """Function that implements the generation of the VM XML definition.
397 Additional devices are in dev_list list
398 The main disk is upon dev_list[0]"""
399
400 #get if operating system is Windows
401 windows_os = False
402 os_type = server_metadata.get('os_type', None)
403 if os_type == None and 'metadata' in dev_list[0]:
404 os_type = dev_list[0]['metadata'].get('os_type', None)
405 if os_type != None and os_type.lower() == "windows":
406 windows_os = True
407 #get type of hard disk bus
408 bus_ide = True if windows_os else False
409 bus = server_metadata.get('bus', None)
410 if bus == None and 'metadata' in dev_list[0]:
411 bus = dev_list[0]['metadata'].get('bus', None)
412 if bus != None:
413 bus_ide = True if bus=='ide' else False
414
415 self.xml_level = 0
416
417 text = "<domain type='kvm'>"
418 #get topology
419 topo = server_metadata.get('topology', None)
420 if topo == None and 'metadata' in dev_list[0]:
421 topo = dev_list[0]['metadata'].get('topology', None)
422 #name
423 name = server.get('name','') + "_" + server['uuid']
424 name = name[:58] #qemu impose a length limit of 59 chars or not start. Using 58
425 text += self.inc_tab() + "<name>" + name+ "</name>"
426 #uuid
427 text += self.tab() + "<uuid>" + server['uuid'] + "</uuid>"
428
429 numa={}
430 if 'extended' in server and server['extended']!=None and 'numas' in server['extended']:
431 numa = server['extended']['numas'][0]
432 #memory
433 use_huge = False
434 memory = int(numa.get('memory',0))*1024*1024 #in KiB
435 if memory==0:
436 memory = int(server['ram'])*1024;
437 else:
438 if not self.develop_mode:
439 use_huge = True
440 if memory==0:
441 return -1, 'No memory assigned to instance'
442 memory = str(memory)
443 text += self.tab() + "<memory unit='KiB'>" +memory+"</memory>"
444 text += self.tab() + "<currentMemory unit='KiB'>" +memory+ "</currentMemory>"
445 if use_huge:
446 text += self.tab()+'<memoryBacking>'+ \
447 self.inc_tab() + '<hugepages/>'+ \
448 self.dec_tab()+ '</memoryBacking>'
449
450 #cpu
451 use_cpu_pinning=False
452 vcpus = int(server.get("vcpus",0))
453 cpu_pinning = []
454 if 'cores-source' in numa:
455 use_cpu_pinning=True
456 for index in range(0, len(numa['cores-source'])):
457 cpu_pinning.append( [ numa['cores-id'][index], numa['cores-source'][index] ] )
458 vcpus += 1
459 if 'threads-source' in numa:
460 use_cpu_pinning=True
461 for index in range(0, len(numa['threads-source'])):
462 cpu_pinning.append( [ numa['threads-id'][index], numa['threads-source'][index] ] )
463 vcpus += 1
464 if 'paired-threads-source' in numa:
465 use_cpu_pinning=True
466 for index in range(0, len(numa['paired-threads-source'])):
467 cpu_pinning.append( [numa['paired-threads-id'][index][0], numa['paired-threads-source'][index][0] ] )
468 cpu_pinning.append( [numa['paired-threads-id'][index][1], numa['paired-threads-source'][index][1] ] )
469 vcpus += 2
470
471 if use_cpu_pinning and not self.develop_mode:
472 text += self.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning)) +"</vcpu>" + \
473 self.tab()+'<cputune>'
474 self.xml_level += 1
475 for i in range(0, len(cpu_pinning)):
476 text += self.tab() + "<vcpupin vcpu='" +str(cpu_pinning[i][0])+ "' cpuset='" +str(cpu_pinning[i][1]) +"'/>"
477 text += self.dec_tab()+'</cputune>'+ \
478 self.tab() + '<numatune>' +\
479 self.inc_tab() + "<memory mode='strict' nodeset='" +str(numa['source'])+ "'/>" +\
480 self.dec_tab() + '</numatune>'
481 else:
482 if vcpus==0:
483 return -1, "Instance without number of cpus"
484 text += self.tab()+"<vcpu>" + str(vcpus) + "</vcpu>"
485
486 #boot
487 boot_cdrom = False
488 for dev in dev_list:
489 if dev['type']=='cdrom' :
490 boot_cdrom = True
491 break
492 text += self.tab()+ '<os>' + \
493 self.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
494 if boot_cdrom:
495 text += self.tab() + "<boot dev='cdrom'/>"
496 text += self.tab() + "<boot dev='hd'/>" + \
497 self.dec_tab()+'</os>'
498 #features
499 text += self.tab()+'<features>'+\
500 self.inc_tab()+'<acpi/>' +\
501 self.tab()+'<apic/>' +\
502 self.tab()+'<pae/>'+ \
503 self.dec_tab() +'</features>'
504 if topo == "oneSocket:hyperthreading":
505 if vcpus % 2 != 0:
506 return -1, 'Cannot expose hyperthreading with an odd number of vcpus'
507 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='2' /> </cpu>" % vcpus/2
508 elif windows_os or topo == "oneSocket":
509 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>" % vcpus
510 else:
511 text += self.tab() + "<cpu mode='host-model'></cpu>"
512 text += self.tab() + "<clock offset='utc'/>" +\
513 self.tab() + "<on_poweroff>preserve</on_poweroff>" + \
514 self.tab() + "<on_reboot>restart</on_reboot>" + \
515 self.tab() + "<on_crash>restart</on_crash>"
516 text += self.tab() + "<devices>" + \
517 self.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
518 self.tab() + "<serial type='pty'>" +\
519 self.inc_tab() + "<target port='0'/>" + \
520 self.dec_tab() + "</serial>" +\
521 self.tab() + "<console type='pty'>" + \
522 self.inc_tab()+ "<target type='serial' port='0'/>" + \
523 self.dec_tab()+'</console>'
524 if windows_os:
525 text += self.tab() + "<controller type='usb' index='0'/>" + \
526 self.tab() + "<controller type='ide' index='0'/>" + \
527 self.tab() + "<input type='mouse' bus='ps2'/>" + \
528 self.tab() + "<sound model='ich6'/>" + \
529 self.tab() + "<video>" + \
530 self.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
531 self.dec_tab() + "</video>" + \
532 self.tab() + "<memballoon model='virtio'/>" + \
533 self.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
534
535 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
536 #> self.dec_tab()+'</hostdev>\n' +\
537 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
538 if windows_os:
539 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
540 else:
541 #If image contains 'GRAPH' include graphics
542 #if 'GRAPH' in image:
543 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
544 self.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
545 self.dec_tab() + "</graphics>"
546
547 vd_index = 'a'
548 for dev in dev_list:
549 bus_ide_dev = bus_ide
550 if dev['type']=='cdrom' or dev['type']=='disk':
551 if dev['type']=='cdrom':
552 bus_ide_dev = True
553 text += self.tab() + "<disk type='file' device='"+dev['type']+"'>"
554 if 'file format' in dev:
555 text += self.inc_tab() + "<driver name='qemu' type='" +dev['file format']+ "' cache='writethrough'/>"
556 if 'source file' in dev:
557 text += self.tab() + "<source file='" +dev['source file']+ "'/>"
558 #elif v['type'] == 'block':
559 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
560 #else:
561 # return -1, 'Unknown disk type ' + v['type']
562 vpci = dev.get('vpci',None)
563 if vpci == None:
564 vpci = dev['metadata'].get('vpci',None)
565 text += self.pci2xml(vpci)
566
567 if bus_ide_dev:
568 text += self.tab() + "<target dev='hd" +vd_index+ "' bus='ide'/>" #TODO allows several type of disks
569 else:
570 text += self.tab() + "<target dev='vd" +vd_index+ "' bus='virtio'/>"
571 text += self.dec_tab() + '</disk>'
572 vd_index = chr(ord(vd_index)+1)
573 elif dev['type']=='xml':
574 dev_text = dev['xml']
575 if 'vpci' in dev:
576 dev_text = dev_text.replace('__vpci__', dev['vpci'])
577 if 'source file' in dev:
578 dev_text = dev_text.replace('__file__', dev['source file'])
579 if 'file format' in dev:
580 dev_text = dev_text.replace('__format__', dev['source file'])
581 if '__dev__' in dev_text:
582 dev_text = dev_text.replace('__dev__', vd_index)
583 vd_index = chr(ord(vd_index)+1)
584 text += dev_text
585 else:
586 return -1, 'Unknown device type ' + dev['type']
587
588 net_nb=0
589 bridge_interfaces = server.get('networks', [])
590 for v in bridge_interfaces:
591 #Get the brifge name
592 self.db_lock.acquire()
593 result, content = self.db.get_table(FROM='nets', SELECT=('provider',),WHERE={'uuid':v['net_id']} )
594 self.db_lock.release()
595 if result <= 0:
596 print "create_xml_server ERROR getting nets",result, content
597 return -1, content
598 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
599 #I know it is not secure
600 #for v in sorted(desc['network interfaces'].itervalues()):
601 model = v.get("model", None)
602 if content[0]['provider']=='default':
603 text += self.tab() + "<interface type='network'>" + \
604 self.inc_tab() + "<source network='" +content[0]['provider']+ "'/>"
605 elif content[0]['provider'][0:7]=='macvtap':
606 text += self.tab()+"<interface type='direct'>" + \
607 self.inc_tab() + "<source dev='" + self.get_local_iface_name(content[0]['provider'][8:]) + "' mode='bridge'/>" + \
608 self.tab() + "<target dev='macvtap0'/>"
609 if windows_os:
610 text += self.tab() + "<alias name='net" + str(net_nb) + "'/>"
611 elif model==None:
612 model = "virtio"
613 elif content[0]['provider'][0:6]=='bridge':
614 text += self.tab() + "<interface type='bridge'>" + \
615 self.inc_tab()+"<source bridge='" +self.get_local_iface_name(content[0]['provider'][7:])+ "'/>"
616 if windows_os:
617 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
618 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
619 elif model==None:
620 model = "virtio"
621 elif content[0]['provider'][0:3] == "OVS":
622 vlan = content[0]['provider'].replace('OVS:', '')
623 text += self.tab() + "<interface type='bridge'>" + \
624 self.inc_tab() + "<source bridge='ovim-" + vlan + "'/>"
625 else:
626 return -1, 'Unknown Bridge net provider ' + content[0]['provider']
627 if model!=None:
628 text += self.tab() + "<model type='" +model+ "'/>"
629 if v.get('mac_address', None) != None:
630 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
631 text += self.pci2xml(v.get('vpci',None))
632 text += self.dec_tab()+'</interface>'
633
634 net_nb += 1
635
636 interfaces = numa.get('interfaces', [])
637
638 net_nb=0
639 for v in interfaces:
640 if self.develop_mode: #map these interfaces to bridges
641 text += self.tab() + "<interface type='bridge'>" + \
642 self.inc_tab()+"<source bridge='" +self.develop_bridge_iface+ "'/>"
643 if windows_os:
644 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
645 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
646 else:
647 text += self.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
648 if v.get('mac_address', None) != None:
649 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
650 text += self.pci2xml(v.get('vpci',None))
651 text += self.dec_tab()+'</interface>'
652 continue
653
654 if v['dedicated'] == 'yes': #passthrought
655 text += self.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
656 self.inc_tab() + "<source>"
657 self.inc_tab()
658 text += self.pci2xml(v['source'])
659 text += self.dec_tab()+'</source>'
660 text += self.pci2xml(v.get('vpci',None))
661 if windows_os:
662 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
663 text += self.dec_tab()+'</hostdev>'
664 net_nb += 1
665 else: #sriov_interfaces
666 #skip not connected interfaces
667 if v.get("net_id") == None:
668 continue
669 text += self.tab() + "<interface type='hostdev' managed='yes'>"
670 self.inc_tab()
671 if v.get('mac_address', None) != None:
672 text+= self.tab() + "<mac address='" +v['mac_address']+ "'/>"
673 text+= self.tab()+'<source>'
674 self.inc_tab()
675 text += self.pci2xml(v['source'])
676 text += self.dec_tab()+'</source>'
677 if v.get('vlan',None) != None:
678 text += self.tab() + "<vlan> <tag id='" + str(v['vlan']) + "'/> </vlan>"
679 text += self.pci2xml(v.get('vpci',None))
680 if windows_os:
681 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
682 text += self.dec_tab()+'</interface>'
683
684
685 text += self.dec_tab()+'</devices>'+\
686 self.dec_tab()+'</domain>'
687 return 0, text
688
689 def pci2xml(self, pci):
690 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
691 alows an empty pci text'''
692 if pci is None:
693 return ""
694 first_part = pci.split(':')
695 second_part = first_part[2].split('.')
696 return self.tab() + "<address type='pci' domain='0x" + first_part[0] + \
697 "' bus='0x" + first_part[1] + "' slot='0x" + second_part[0] + \
698 "' function='0x" + second_part[1] + "'/>"
699
700 def tab(self):
701 """Return indentation according to xml_level"""
702 return "\n" + (' '*self.xml_level)
703
704 def inc_tab(self):
705 """Increment and return indentation according to xml_level"""
706 self.xml_level += 1
707 return self.tab()
708
709 def dec_tab(self):
710 """Decrement and return indentation according to xml_level"""
711 self.xml_level -= 1
712 return self.tab()
713
714 def create_ovs_bridge(self):
715 """
716 Create a bridge in compute OVS to allocate VMs
717 :return: True if success
718 """
719 if self.test:
720 return
721 command = 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
722 print self.name, ': command:', command
723 (_, stdout, _) = self.ssh_conn.exec_command(command)
724 content = stdout.read()
725 if len(content) == 0:
726 return True
727 else:
728 return False
729
730 def delete_port_to_ovs_bridge(self, vlan, net_uuid):
731 """
732 Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed
733 :param vlan: vlan port id
734 :param net_uuid: network id
735 :return:
736 """
737
738 if self.test:
739 return
740
741 port_name = 'ovim-' + vlan
742 command = 'sudo ovs-vsctl del-port br-int ' + port_name
743 print self.name, ': command:', command
744 (_, stdout, _) = self.ssh_conn.exec_command(command)
745 content = stdout.read()
746 if len(content) == 0:
747 return True
748 else:
749 return False
750
751 def delete_dhcp_server(self, vlan, net_uuid, dhcp_path):
752 """
753 Delete dhcp server process lining in namespace
754 :param vlan: segmentation id
755 :param net_uuid: network uuid
756 :param dhcp_path: conf fiel path that live in namespace side
757 :return:
758 """
759 if self.test:
760 return
761 if not self.is_dhcp_port_free(vlan, net_uuid):
762 return True
763
764 net_namespace = 'ovim-' + vlan
765 dhcp_path = os.path.join(dhcp_path, net_namespace)
766 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
767
768 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_file
769 print self.name, ': command:', command
770 (_, stdout, _) = self.ssh_conn.exec_command(command)
771 content = stdout.read()
772
773 command = 'sudo ip netns exec ' + net_namespace + ' kill -9 ' + content
774 print self.name, ': command:', command
775 (_, stdout, _) = self.ssh_conn.exec_command(command)
776 content = stdout.read()
777
778 # if len(content) == 0:
779 # return True
780 # else:
781 # return False
782
783 def is_dhcp_port_free(self, host_id, net_uuid):
784 """
785 Check if any port attached to the a net in a vxlan mesh across computes nodes
786 :param host_id: host id
787 :param net_uuid: network id
788 :return: True if is not free
789 """
790 self.db_lock.acquire()
791 result, content = self.db.get_table(
792 FROM='ports',
793 WHERE={'p.type': 'instance:ovs', 'p.net_id': net_uuid}
794 )
795 self.db_lock.release()
796
797 if len(content) > 0:
798 return False
799 else:
800 return True
801
802 def is_port_free(self, host_id, net_uuid):
803 """
804 Check if there not ovs ports of a network in a compute host.
805 :param host_id: host id
806 :param net_uuid: network id
807 :return: True if is not free
808 """
809
810 self.db_lock.acquire()
811 result, content = self.db.get_table(
812 FROM='ports as p join instances as i on p.instance_id=i.uuid',
813 WHERE={"i.host_id": self.host_id, 'p.type': 'instance:ovs', 'p.net_id': net_uuid}
814 )
815 self.db_lock.release()
816
817 if len(content) > 0:
818 return False
819 else:
820 return True
821
822 def add_port_to_ovs_bridge(self, vlan):
823 """
824 Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge
825 :param vlan: vlan port id
826 :return: True if success
827 """
828
829 if self.test:
830 return
831
832 port_name = 'ovim-' + vlan
833 command = 'sudo ovs-vsctl add-port br-int ' + port_name + ' tag=' + vlan
834 print self.name, ': command:', command
835 (_, stdout, _) = self.ssh_conn.exec_command(command)
836 content = stdout.read()
837 if len(content) == 0:
838 return True
839 else:
840 return False
841
842 def delete_dhcp_port(self, vlan, net_uuid):
843 """
844 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
845 :param vlan: segmentation id
846 :param net_uuid: network id
847 :return: True if success
848 """
849
850 if self.test:
851 return
852
853 if not self.is_dhcp_port_free(vlan, net_uuid):
854 return True
855 self.delete_dhcp_interfaces(vlan)
856 return True
857
858 def delete_bridge_port_attached_to_ovs(self, vlan, net_uuid):
859 """
860 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
861 :param vlan:
862 :param net_uuid:
863 :return: True if success
864 """
865 if self.test:
866 return
867
868 if not self.is_port_free(vlan, net_uuid):
869 return True
870 self.delete_port_to_ovs_bridge(vlan, net_uuid)
871 self.delete_linux_bridge(vlan)
872 return True
873
874 def delete_linux_bridge(self, vlan):
875 """
876 Delete a linux bridge in a scpecific compute.
877 :param vlan: vlan port id
878 :return: True if success
879 """
880
881 if self.test:
882 return
883
884 port_name = 'ovim-' + vlan
885 command = 'sudo ip link set dev veth0-' + vlan + ' down'
886 print self.name, ': command:', command
887 (_, stdout, _) = self.ssh_conn.exec_command(command)
888 content = stdout.read()
889 #
890 # if len(content) != 0:
891 # return False
892
893 command = 'sudo ifconfig ' + port_name + ' down && sudo brctl delbr ' + port_name
894 print self.name, ': command:', command
895 (_, stdout, _) = self.ssh_conn.exec_command(command)
896 content = stdout.read()
897 if len(content) == 0:
898 return True
899 else:
900 return False
901
902 def create_ovs_bridge_port(self, vlan):
903 """
904 Generate a linux bridge and attache the port to a OVS bridge
905 :param vlan: vlan port id
906 :return:
907 """
908 if self.test:
909 return
910 self.create_linux_bridge(vlan)
911 self.add_port_to_ovs_bridge(vlan)
912
913 def create_linux_bridge(self, vlan):
914 """
915 Create a linux bridge with STP active
916 :param vlan: netowrk vlan id
917 :return:
918 """
919
920 if self.test:
921 return
922
923 port_name = 'ovim-' + vlan
924 command = 'sudo brctl show | grep ' + port_name
925 print self.name, ': command:', command
926 (_, stdout, _) = self.ssh_conn.exec_command(command)
927 content = stdout.read()
928
929 # if exist nothing to create
930 # if len(content) == 0:
931 # return False
932
933 command = 'sudo brctl addbr ' + port_name
934 print self.name, ': command:', command
935 (_, stdout, _) = self.ssh_conn.exec_command(command)
936 content = stdout.read()
937
938 # if len(content) == 0:
939 # return True
940 # else:
941 # return False
942
943 command = 'sudo brctl stp ' + port_name + ' on'
944 print self.name, ': command:', command
945 (_, stdout, _) = self.ssh_conn.exec_command(command)
946 content = stdout.read()
947
948 # if len(content) == 0:
949 # return True
950 # else:
951 # return False
952 command = 'sudo ip link set dev ' + port_name + ' up'
953 print self.name, ': command:', command
954 (_, stdout, _) = self.ssh_conn.exec_command(command)
955 content = stdout.read()
956
957 if len(content) == 0:
958 return True
959 else:
960 return False
961
962 def set_mac_dhcp_server(self, ip, mac, vlan, netmask, dhcp_path):
963 """
964 Write into dhcp conf file a rule to assigned a fixed ip given to an specific MAC address
965 :param ip: IP address asigned to a VM
966 :param mac: VM vnic mac to be macthed with the IP received
967 :param vlan: Segmentation id
968 :param netmask: netmask value
969 :param path: dhcp conf file path that live in namespace side
970 :return: True if success
971 """
972
973 if self.test:
974 return
975
976 net_namespace = 'ovim-' + vlan
977 dhcp_path = os.path.join(dhcp_path, net_namespace)
978 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
979
980 if not ip:
981 return False
982
983 ip_data = mac.upper() + ',' + ip
984
985 command = 'sudo ip netns exec ' + net_namespace + ' touch ' + dhcp_hostsdir
986 print self.name, ': command:', command
987 (_, stdout, _) = self.ssh_conn.exec_command(command)
988 content = stdout.read()
989
990 command = 'sudo ip netns exec ' + net_namespace + ' sudo bash -ec "echo ' + ip_data + ' >> ' + dhcp_hostsdir + '"'
991
992 print self.name, ': command:', command
993 (_, stdout, _) = self.ssh_conn.exec_command(command)
994 content = stdout.read()
995
996 if len(content) == 0:
997 return True
998 else:
999 return False
1000
1001 def delete_mac_dhcp_server(self, ip, mac, vlan, dhcp_path):
1002 """
1003 Delete into dhcp conf file the ip assigned to a specific MAC address
1004
1005 :param ip: IP address asigned to a VM
1006 :param mac: VM vnic mac to be macthed with the IP received
1007 :param vlan: Segmentation id
1008 :param dhcp_path: dhcp conf file path that live in namespace side
1009 :return:
1010 """
1011
1012 if self.test:
1013 return
1014
1015 net_namespace = 'ovim-' + vlan
1016 dhcp_path = os.path.join(dhcp_path, net_namespace)
1017 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
1018
1019 if not ip:
1020 return False
1021
1022 ip_data = mac.upper() + ',' + ip
1023
1024 command = 'sudo ip netns exec ' + net_namespace + ' sudo sed -i \'/' + ip_data + '/d\' ' + dhcp_hostsdir
1025 print self.name, ': command:', command
1026 (_, stdout, _) = self.ssh_conn.exec_command(command)
1027 content = stdout.read()
1028
1029 if len(content) == 0:
1030 return True
1031 else:
1032 return False
1033
1034 def launch_dhcp_server(self, vlan, ip_range, netmask, dhcp_path, gateway):
1035 """
1036 Generate a linux bridge and attache the port to a OVS bridge
1037 :param self:
1038 :param vlan: Segmentation id
1039 :param ip_range: IP dhcp range
1040 :param netmask: network netmask
1041 :param dhcp_path: dhcp conf file path that live in namespace side
1042 :param gateway: Gateway address for dhcp net
1043 :return: True if success
1044 """
1045
1046 if self.test:
1047 return
1048
1049 interface = 'tap-' + vlan
1050 net_namespace = 'ovim-' + vlan
1051 dhcp_path = os.path.join(dhcp_path, net_namespace)
1052 leases_path = os.path.join(dhcp_path, "dnsmasq.leases")
1053 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
1054
1055 dhcp_range = ip_range[0] + ',' + ip_range[1] + ',' + netmask
1056
1057 command = 'sudo ip netns exec ' + net_namespace + ' mkdir -p ' + dhcp_path
1058 print self.name, ': command:', command
1059 (_, stdout, _) = self.ssh_conn.exec_command(command)
1060 content = stdout.read()
1061
1062 pid_path = os.path.join(dhcp_path, 'dnsmasq.pid')
1063 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_path
1064 print self.name, ': command:', command
1065 (_, stdout, _) = self.ssh_conn.exec_command(command)
1066 content = stdout.read()
1067 # check if pid is runing
1068 pid_status_path = content
1069 if content:
1070 command = "ps aux | awk '{print $2 }' | grep " + pid_status_path
1071 print self.name, ': command:', command
1072 (_, stdout, _) = self.ssh_conn.exec_command(command)
1073 content = stdout.read()
1074 if not content:
1075 command = 'sudo ip netns exec ' + net_namespace + ' /usr/sbin/dnsmasq --strict-order --except-interface=lo ' \
1076 '--interface=' + interface + ' --bind-interfaces --dhcp-hostsdir=' + dhcp_path + \
1077 ' --dhcp-range ' + dhcp_range + ' --pid-file=' + pid_file + ' --dhcp-leasefile=' + leases_path + \
1078 ' --listen-address ' + gateway
1079
1080 print self.name, ': command:', command
1081 (_, stdout, _) = self.ssh_conn.exec_command(command)
1082 content = stdout.readline()
1083
1084 if len(content) == 0:
1085 return True
1086 else:
1087 return False
1088
1089 def delete_dhcp_interfaces(self, vlan):
1090 """
1091 Create a linux bridge with STP active
1092 :param vlan: netowrk vlan id
1093 :return:
1094 """
1095
1096 if self.test:
1097 return
1098
1099 net_namespace = 'ovim-' + vlan
1100 command = 'sudo ovs-vsctl del-port br-int ovs-tap-' + vlan
1101 print self.name, ': command:', command
1102 (_, stdout, _) = self.ssh_conn.exec_command(command)
1103 content = stdout.read()
1104
1105 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + vlan + ' down'
1106 print self.name, ': command:', command
1107 (_, stdout, _) = self.ssh_conn.exec_command(command)
1108 content = stdout.read()
1109
1110 command = 'sudo ip link set dev ovs-tap-' + vlan + ' down'
1111 print self.name, ': command:', command
1112 (_, stdout, _) = self.ssh_conn.exec_command(command)
1113 content = stdout.read()
1114
1115 def create_dhcp_interfaces(self, vlan, ip, netmask):
1116 """
1117 Create a linux bridge with STP active
1118 :param vlan: segmentation id
1119 :param ip: Ip included in the dhcp range for the tap interface living in namesapce side
1120 :param netmask: dhcp net CIDR
1121 :return: True if success
1122 """
1123
1124 if self.test:
1125 return
1126
1127 net_namespace = 'ovim-' + vlan
1128 namespace_interface = 'tap-' + vlan
1129
1130 command = 'sudo ip netns add ' + net_namespace
1131 print self.name, ': command:', command
1132 (_, stdout, _) = self.ssh_conn.exec_command(command)
1133 content = stdout.read()
1134
1135 command = 'sudo ip link add tap-' + vlan + ' type veth peer name ovs-tap-' + vlan
1136 print self.name, ': command:', command
1137 (_, stdout, _) = self.ssh_conn.exec_command(command)
1138 content = stdout.read()
1139
1140 command = 'sudo ovs-vsctl add-port br-int ovs-tap-' + vlan + ' tag=' + vlan
1141 print self.name, ': command:', command
1142 (_, stdout, _) = self.ssh_conn.exec_command(command)
1143 content = stdout.read()
1144
1145 command = 'sudo ip link set tap-' + vlan + ' netns ' + net_namespace
1146 print self.name, ': command:', command
1147 (_, stdout, _) = self.ssh_conn.exec_command(command)
1148 content = stdout.read()
1149
1150 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + vlan + ' up'
1151 print self.name, ': command:', command
1152 (_, stdout, _) = self.ssh_conn.exec_command(command)
1153 content = stdout.read()
1154
1155 command = 'sudo ip link set dev ovs-tap-' + vlan + ' up'
1156 print self.name, ': command:', command
1157 (_, stdout, _) = self.ssh_conn.exec_command(command)
1158 content = stdout.read()
1159
1160 command = 'sudo ip netns exec ' + net_namespace + ' ' + ' ifconfig ' + namespace_interface \
1161 + ' ' + ip + ' netmask ' + netmask
1162 print self.name, ': command:', command
1163 (_, stdout, _) = self.ssh_conn.exec_command(command)
1164 content = stdout.read()
1165
1166 if len(content) == 0:
1167 return True
1168 else:
1169 return False
1170
1171 def create_ovs_vxlan_tunnel(self, vxlan_interface, remote_ip):
1172 """
1173 Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level
1174 :param vxlan_interface: vlxan inteface name.
1175 :param remote_ip: tunnel endpoint remote compute ip.
1176 :return:
1177 """
1178 if self.test:
1179 return
1180 command = 'sudo ovs-vsctl add-port br-int ' + vxlan_interface + \
1181 ' -- set Interface ' + vxlan_interface + ' type=vxlan options:remote_ip=' + remote_ip + \
1182 ' -- set Port ' + vxlan_interface + ' other_config:stp-path-cost=10'
1183 print self.name, ': command:', command
1184 (_, stdout, _) = self.ssh_conn.exec_command(command)
1185 content = stdout.read()
1186 print content
1187 if len(content) == 0:
1188 return True
1189 else:
1190 return False
1191
1192 def delete_ovs_vxlan_tunnel(self, vxlan_interface):
1193 """
1194 Delete a vlxan tunnel port from a OVS brdige.
1195 :param vxlan_interface: vlxan name to be delete it.
1196 :return: True if success.
1197 """
1198 if self.test:
1199 return
1200 command = 'sudo ovs-vsctl del-port br-int ' + vxlan_interface
1201 print self.name, ': command:', command
1202 (_, stdout, _) = self.ssh_conn.exec_command(command)
1203 content = stdout.read()
1204 print content
1205 if len(content) == 0:
1206 return True
1207 else:
1208 return False
1209
1210 def delete_ovs_bridge(self):
1211 """
1212 Delete a OVS bridge from a compute.
1213 :return: True if success
1214 """
1215 if self.test:
1216 return
1217 command = 'sudo ovs-vsctl del-br br-int'
1218 print self.name, ': command:', command
1219 (_, stdout, _) = self.ssh_conn.exec_command(command)
1220 content = stdout.read()
1221 if len(content) == 0:
1222 return True
1223 else:
1224 return False
1225
1226 def get_file_info(self, path):
1227 command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
1228 print self.name, ': command:', command
1229 (_, stdout, _) = self.ssh_conn.exec_command(command)
1230 content = stdout.read()
1231 if len(content) == 0:
1232 return None # file does not exist
1233 else:
1234 return content.split(" ") #(permission, 1, owner, group, size, date, file)
1235
1236 def qemu_get_info(self, path):
1237 command = 'qemu-img info ' + path
1238 print self.name, ': command:', command
1239 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
1240 content = stdout.read()
1241 if len(content) == 0:
1242 error = stderr.read()
1243 print self.name, ": get_qemu_info error ", error
1244 raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error)
1245 else:
1246 try:
1247 return yaml.load(content)
1248 except yaml.YAMLError as exc:
1249 text = ""
1250 if hasattr(exc, 'problem_mark'):
1251 mark = exc.problem_mark
1252 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
1253 print self.name, ": get_qemu_info yaml format Exception", text
1254 raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text)
1255
1256 def qemu_change_backing(self, inc_file, new_backing_file):
1257 command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file
1258 print self.name, ': command:', command
1259 (_, _, stderr) = self.ssh_conn.exec_command(command)
1260 content = stderr.read()
1261 if len(content) == 0:
1262 return 0
1263 else:
1264 print self.name, ": qemu_change_backing error: ", content
1265 return -1
1266
1267 def get_notused_filename(self, proposed_name, suffix=''):
1268 '''Look for a non existing file_name in the host
1269 proposed_name: proposed file name, includes path
1270 suffix: suffix to be added to the name, before the extention
1271 '''
1272 extension = proposed_name.rfind(".")
1273 slash = proposed_name.rfind("/")
1274 if extension < 0 or extension < slash: # no extension
1275 extension = len(proposed_name)
1276 target_name = proposed_name[:extension] + suffix + proposed_name[extension:]
1277 info = self.get_file_info(target_name)
1278 if info is None:
1279 return target_name
1280
1281 index=0
1282 while info is not None:
1283 target_name = proposed_name[:extension] + suffix + "-" + str(index) + proposed_name[extension:]
1284 index+=1
1285 info = self.get_file_info(target_name)
1286 return target_name
1287
1288 def get_notused_path(self, proposed_path, suffix=''):
1289 '''Look for a non existing path at database for images
1290 proposed_path: proposed file name, includes path
1291 suffix: suffix to be added to the name, before the extention
1292 '''
1293 extension = proposed_path.rfind(".")
1294 if extension < 0:
1295 extension = len(proposed_path)
1296 if suffix != None:
1297 target_path = proposed_path[:extension] + suffix + proposed_path[extension:]
1298 index=0
1299 while True:
1300 r,_=self.db.get_table(FROM="images",WHERE={"path":target_path})
1301 if r<=0:
1302 return target_path
1303 target_path = proposed_path[:extension] + suffix + "-" + str(index) + proposed_path[extension:]
1304 index+=1
1305
1306
1307 def delete_file(self, file_name):
1308 command = 'rm -f '+file_name
1309 print self.name, ': command:', command
1310 (_, _, stderr) = self.ssh_conn.exec_command(command)
1311 error_msg = stderr.read()
1312 if len(error_msg) > 0:
1313 raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg)
1314
1315 def copy_file(self, source, destination, perserve_time=True):
1316 if source[0:4]=="http":
1317 command = "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
1318 dst=destination, src=source, dst_result=destination + ".result" )
1319 else:
1320 command = 'cp --no-preserve=mode'
1321 if perserve_time:
1322 command += ' --preserve=timestamps'
1323 command += " '{}' '{}'".format(source, destination)
1324 print self.name, ': command:', command
1325 (_, _, stderr) = self.ssh_conn.exec_command(command)
1326 error_msg = stderr.read()
1327 if len(error_msg) > 0:
1328 raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg)
1329
1330 def copy_remote_file(self, remote_file, use_incremental):
1331 ''' Copy a file from the repository to local folder and recursively
1332 copy the backing files in case the remote file is incremental
1333 Read and/or modified self.localinfo['files'] that contain the
1334 unmodified copies of images in the local path
1335 params:
1336 remote_file: path of remote file
1337 use_incremental: None (leave the decision to this function), True, False
1338 return:
1339 local_file: name of local file
1340 qemu_info: dict with quemu information of local file
1341 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
1342 '''
1343
1344 use_incremental_out = use_incremental
1345 new_backing_file = None
1346 local_file = None
1347 file_from_local = True
1348
1349 #in case incremental use is not decided, take the decision depending on the image
1350 #avoid the use of incremental if this image is already incremental
1351 if remote_file[0:4] == "http":
1352 file_from_local = False
1353 if file_from_local:
1354 qemu_remote_info = self.qemu_get_info(remote_file)
1355 if use_incremental_out==None:
1356 use_incremental_out = not ( file_from_local and 'backing file' in qemu_remote_info)
1357 #copy recursivelly the backing files
1358 if file_from_local and 'backing file' in qemu_remote_info:
1359 new_backing_file, _, _ = self.copy_remote_file(qemu_remote_info['backing file'], True)
1360
1361 #check if remote file is present locally
1362 if use_incremental_out and remote_file in self.localinfo['files']:
1363 local_file = self.localinfo['files'][remote_file]
1364 local_file_info = self.get_file_info(local_file)
1365 if file_from_local:
1366 remote_file_info = self.get_file_info(remote_file)
1367 if local_file_info == None:
1368 local_file = None
1369 elif file_from_local and (local_file_info[4]!=remote_file_info[4] or local_file_info[5]!=remote_file_info[5]):
1370 #local copy of file not valid because date or size are different.
1371 #TODO DELETE local file if this file is not used by any active virtual machine
1372 try:
1373 self.delete_file(local_file)
1374 del self.localinfo['files'][remote_file]
1375 except Exception:
1376 pass
1377 local_file = None
1378 else: #check that the local file has the same backing file, or there are not backing at all
1379 qemu_info = self.qemu_get_info(local_file)
1380 if new_backing_file != qemu_info.get('backing file'):
1381 local_file = None
1382
1383
1384 if local_file == None: #copy the file
1385 img_name= remote_file.split('/') [-1]
1386 img_local = self.image_path + '/' + img_name
1387 local_file = self.get_notused_filename(img_local)
1388 self.copy_file(remote_file, local_file, use_incremental_out)
1389
1390 if use_incremental_out:
1391 self.localinfo['files'][remote_file] = local_file
1392 if new_backing_file:
1393 self.qemu_change_backing(local_file, new_backing_file)
1394 qemu_info = self.qemu_get_info(local_file)
1395
1396 return local_file, qemu_info, use_incremental_out
1397
1398 def launch_server(self, conn, server, rebuild=False, domain=None):
1399 if self.test:
1400 time.sleep(random.randint(20,150)) #sleep random timeto be make it a bit more real
1401 return 0, 'Success'
1402
1403 server_id = server['uuid']
1404 paused = server.get('paused','no')
1405 try:
1406 if domain!=None and rebuild==False:
1407 domain.resume()
1408 #self.server_status[server_id] = 'ACTIVE'
1409 return 0, 'Success'
1410
1411 self.db_lock.acquire()
1412 result, server_data = self.db.get_instance(server_id)
1413 self.db_lock.release()
1414 if result <= 0:
1415 print self.name, ": launch_server ERROR getting server from DB",result, server_data
1416 return result, server_data
1417
1418 #0: get image metadata
1419 server_metadata = server.get('metadata', {})
1420 use_incremental = None
1421
1422 if "use_incremental" in server_metadata:
1423 use_incremental = False if server_metadata["use_incremental"]=="no" else True
1424
1425 server_host_files = self.localinfo['server_files'].get( server['uuid'], {})
1426 if rebuild:
1427 #delete previous incremental files
1428 for file_ in server_host_files.values():
1429 self.delete_file(file_['source file'] )
1430 server_host_files={}
1431
1432 #1: obtain aditional devices (disks)
1433 #Put as first device the main disk
1434 devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ]
1435 if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']:
1436 devices += server_data['extended']['devices']
1437
1438 for dev in devices:
1439 if dev['image_id'] == None:
1440 continue
1441
1442 self.db_lock.acquire()
1443 result, content = self.db.get_table(FROM='images', SELECT=('path', 'metadata'),
1444 WHERE={'uuid': dev['image_id']})
1445 self.db_lock.release()
1446 if result <= 0:
1447 error_text = "ERROR", result, content, "when getting image", dev['image_id']
1448 print self.name, ": launch_server", error_text
1449 return -1, error_text
1450 if content[0]['metadata'] is not None:
1451 dev['metadata'] = json.loads(content[0]['metadata'])
1452 else:
1453 dev['metadata'] = {}
1454
1455 if dev['image_id'] in server_host_files:
1456 dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path
1457 dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2
1458 continue
1459
1460 #2: copy image to host
1461 remote_file = content[0]['path']
1462 use_incremental_image = use_incremental
1463 if dev['metadata'].get("use_incremental") == "no":
1464 use_incremental_image = False
1465 local_file, qemu_info, use_incremental_image = self.copy_remote_file(remote_file, use_incremental_image)
1466
1467 #create incremental image
1468 if use_incremental_image:
1469 local_file_inc = self.get_notused_filename(local_file, '.inc')
1470 command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file
1471 print 'command:', command
1472 (_, _, stderr) = self.ssh_conn.exec_command(command)
1473 error_msg = stderr.read()
1474 if len(error_msg) > 0:
1475 raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg)
1476 local_file = local_file_inc
1477 qemu_info = {'file format':'qcow2'}
1478
1479 server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']}
1480
1481 dev['source file'] = local_file
1482 dev['file format'] = qemu_info['file format']
1483
1484 self.localinfo['server_files'][ server['uuid'] ] = server_host_files
1485 self.localinfo_dirty = True
1486
1487 #3 Create XML
1488 result, xml = self.create_xml_server(server_data, devices, server_metadata) #local_file
1489 if result <0:
1490 print self.name, ": create xml server error:", xml
1491 return -2, xml
1492 print self.name, ": create xml:", xml
1493 atribute = host_thread.lvirt_module.VIR_DOMAIN_START_PAUSED if paused == "yes" else 0
1494 #4 Start the domain
1495 if not rebuild: #ensures that any pending destroying server is done
1496 self.server_forceoff(True)
1497 #print self.name, ": launching instance" #, xml
1498 conn.createXML(xml, atribute)
1499 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
1500
1501 return 0, 'Success'
1502
1503 except paramiko.ssh_exception.SSHException as e:
1504 text = e.args[0]
1505 print self.name, ": launch_server(%s) ssh Exception: %s" %(server_id, text)
1506 if "SSH session not active" in text:
1507 self.ssh_connect()
1508 except host_thread.lvirt_module.libvirtError as e:
1509 text = e.get_error_message()
1510 print self.name, ": launch_server(%s) libvirt Exception: %s" %(server_id, text)
1511 except Exception as e:
1512 text = str(e)
1513 print self.name, ": launch_server(%s) Exception: %s" %(server_id, text)
1514 return -1, text
1515
1516 def update_servers_status(self):
1517 # # virDomainState
1518 # VIR_DOMAIN_NOSTATE = 0
1519 # VIR_DOMAIN_RUNNING = 1
1520 # VIR_DOMAIN_BLOCKED = 2
1521 # VIR_DOMAIN_PAUSED = 3
1522 # VIR_DOMAIN_SHUTDOWN = 4
1523 # VIR_DOMAIN_SHUTOFF = 5
1524 # VIR_DOMAIN_CRASHED = 6
1525 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
1526
1527 if self.test or len(self.server_status)==0:
1528 return
1529
1530 try:
1531 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1532 domains= conn.listAllDomains()
1533 domain_dict={}
1534 for domain in domains:
1535 uuid = domain.UUIDString() ;
1536 libvirt_status = domain.state()
1537 #print libvirt_status
1538 if libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_RUNNING or libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTDOWN:
1539 new_status = "ACTIVE"
1540 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_PAUSED:
1541 new_status = "PAUSED"
1542 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTOFF:
1543 new_status = "INACTIVE"
1544 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_CRASHED:
1545 new_status = "ERROR"
1546 else:
1547 new_status = None
1548 domain_dict[uuid] = new_status
1549 conn.close()
1550 except host_thread.lvirt_module.libvirtError as e:
1551 print self.name, ": get_state() Exception '", e.get_error_message()
1552 return
1553
1554 for server_id, current_status in self.server_status.iteritems():
1555 new_status = None
1556 if server_id in domain_dict:
1557 new_status = domain_dict[server_id]
1558 else:
1559 new_status = "INACTIVE"
1560
1561 if new_status == None or new_status == current_status:
1562 continue
1563 if new_status == 'INACTIVE' and current_status == 'ERROR':
1564 continue #keep ERROR status, because obviously this machine is not running
1565 #change status
1566 print self.name, ": server ", server_id, "status change from ", current_status, "to", new_status
1567 STATUS={'progress':100, 'status':new_status}
1568 if new_status == 'ERROR':
1569 STATUS['last_error'] = 'machine has crashed'
1570 self.db_lock.acquire()
1571 r,_ = self.db.update_rows('instances', STATUS, {'uuid':server_id}, log=False)
1572 self.db_lock.release()
1573 if r>=0:
1574 self.server_status[server_id] = new_status
1575
1576 def action_on_server(self, req, last_retry=True):
1577 '''Perform an action on a req
1578 Attributes:
1579 req: dictionary that contain:
1580 server properties: 'uuid','name','tenant_id','status'
1581 action: 'action'
1582 host properties: 'user', 'ip_name'
1583 return (error, text)
1584 0: No error. VM is updated to new state,
1585 -1: Invalid action, as trying to pause a PAUSED VM
1586 -2: Error accessing host
1587 -3: VM nor present
1588 -4: Error at DB access
1589 -5: Error while trying to perform action. VM is updated to ERROR
1590 '''
1591 server_id = req['uuid']
1592 conn = None
1593 new_status = None
1594 old_status = req['status']
1595 last_error = None
1596
1597 if self.test:
1598 if 'terminate' in req['action']:
1599 new_status = 'deleted'
1600 elif 'shutoff' in req['action'] or 'shutdown' in req['action'] or 'forceOff' in req['action']:
1601 if req['status']!='ERROR':
1602 time.sleep(5)
1603 new_status = 'INACTIVE'
1604 elif 'start' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1605 elif 'resume' in req['action'] and req['status']!='ERROR' and req['status']!='INACTIVE' : new_status = 'ACTIVE'
1606 elif 'pause' in req['action'] and req['status']!='ERROR': new_status = 'PAUSED'
1607 elif 'reboot' in req['action'] and req['status']!='ERROR': new_status = 'ACTIVE'
1608 elif 'rebuild' in req['action']:
1609 time.sleep(random.randint(20,150))
1610 new_status = 'ACTIVE'
1611 elif 'createImage' in req['action']:
1612 time.sleep(5)
1613 self.create_image(None, req)
1614 else:
1615 try:
1616 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1617 try:
1618 dom = conn.lookupByUUIDString(server_id)
1619 except host_thread.lvirt_module.libvirtError as e:
1620 text = e.get_error_message()
1621 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1622 dom = None
1623 else:
1624 print self.name, ": action_on_server(",server_id,") libvirt exception:", text
1625 raise e
1626
1627 if 'forceOff' in req['action']:
1628 if dom == None:
1629 print self.name, ": action_on_server(",server_id,") domain not running"
1630 else:
1631 try:
1632 print self.name, ": sending DESTROY to server", server_id
1633 dom.destroy()
1634 except Exception as e:
1635 if "domain is not running" not in e.get_error_message():
1636 print self.name, ": action_on_server(",server_id,") Exception while sending force off:", e.get_error_message()
1637 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1638 new_status = 'ERROR'
1639
1640 elif 'terminate' in req['action']:
1641 if dom == None:
1642 print self.name, ": action_on_server(",server_id,") domain not running"
1643 new_status = 'deleted'
1644 else:
1645 try:
1646 if req['action']['terminate'] == 'force':
1647 print self.name, ": sending DESTROY to server", server_id
1648 dom.destroy()
1649 new_status = 'deleted'
1650 else:
1651 print self.name, ": sending SHUTDOWN to server", server_id
1652 dom.shutdown()
1653 self.pending_terminate_server.append( (time.time()+10,server_id) )
1654 except Exception as e:
1655 print self.name, ": action_on_server(",server_id,") Exception while destroy:", e.get_error_message()
1656 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1657 new_status = 'ERROR'
1658 if "domain is not running" in e.get_error_message():
1659 try:
1660 dom.undefine()
1661 new_status = 'deleted'
1662 except Exception:
1663 print self.name, ": action_on_server(",server_id,") Exception while undefine:", e.get_error_message()
1664 last_error = 'action_on_server Exception2 while undefine:', e.get_error_message()
1665 #Exception: 'virDomainDetachDevice() failed'
1666 if new_status=='deleted':
1667 if server_id in self.server_status:
1668 del self.server_status[server_id]
1669 if req['uuid'] in self.localinfo['server_files']:
1670 for file_ in self.localinfo['server_files'][ req['uuid'] ].values():
1671 try:
1672 self.delete_file(file_['source file'])
1673 except Exception:
1674 pass
1675 del self.localinfo['server_files'][ req['uuid'] ]
1676 self.localinfo_dirty = True
1677
1678 elif 'shutoff' in req['action'] or 'shutdown' in req['action']:
1679 try:
1680 if dom == None:
1681 print self.name, ": action_on_server(",server_id,") domain not running"
1682 else:
1683 dom.shutdown()
1684 # new_status = 'INACTIVE'
1685 #TODO: check status for changing at database
1686 except Exception as e:
1687 new_status = 'ERROR'
1688 print self.name, ": action_on_server(",server_id,") Exception while shutdown:", e.get_error_message()
1689 last_error = 'action_on_server Exception while shutdown: ' + e.get_error_message()
1690
1691 elif 'rebuild' in req['action']:
1692 if dom != None:
1693 dom.destroy()
1694 r = self.launch_server(conn, req, True, None)
1695 if r[0] <0:
1696 new_status = 'ERROR'
1697 last_error = r[1]
1698 else:
1699 new_status = 'ACTIVE'
1700 elif 'start' in req['action']:
1701 # The instance is only create in DB but not yet at libvirt domain, needs to be create
1702 rebuild = True if req['action']['start'] == 'rebuild' else False
1703 r = self.launch_server(conn, req, rebuild, dom)
1704 if r[0] <0:
1705 new_status = 'ERROR'
1706 last_error = r[1]
1707 else:
1708 new_status = 'ACTIVE'
1709
1710 elif 'resume' in req['action']:
1711 try:
1712 if dom == None:
1713 pass
1714 else:
1715 dom.resume()
1716 # new_status = 'ACTIVE'
1717 except Exception as e:
1718 print self.name, ": action_on_server(",server_id,") Exception while resume:", e.get_error_message()
1719
1720 elif 'pause' in req['action']:
1721 try:
1722 if dom == None:
1723 pass
1724 else:
1725 dom.suspend()
1726 # new_status = 'PAUSED'
1727 except Exception as e:
1728 print self.name, ": action_on_server(",server_id,") Exception while pause:", e.get_error_message()
1729
1730 elif 'reboot' in req['action']:
1731 try:
1732 if dom == None:
1733 pass
1734 else:
1735 dom.reboot()
1736 print self.name, ": action_on_server(",server_id,") reboot:"
1737 #new_status = 'ACTIVE'
1738 except Exception as e:
1739 print self.name, ": action_on_server(",server_id,") Exception while reboot:", e.get_error_message()
1740 elif 'createImage' in req['action']:
1741 self.create_image(dom, req)
1742
1743
1744 conn.close()
1745 except host_thread.lvirt_module.libvirtError as e:
1746 if conn is not None: conn.close()
1747 text = e.get_error_message()
1748 new_status = "ERROR"
1749 last_error = text
1750 print self.name, ": action_on_server(",server_id,") Exception '", text
1751 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1752 print self.name, ": action_on_server(",server_id,") Exception removed from host"
1753 #end of if self.test
1754 if new_status == None:
1755 return 1
1756
1757 print self.name, ": action_on_server(",server_id,") new status", new_status, last_error
1758 UPDATE = {'progress':100, 'status':new_status}
1759
1760 if new_status=='ERROR':
1761 if not last_retry: #if there will be another retry do not update database
1762 return -1
1763 elif 'terminate' in req['action']:
1764 #PUT a log in the database
1765 print self.name, ": PANIC deleting server", server_id, last_error
1766 self.db_lock.acquire()
1767 self.db.new_row('logs',
1768 {'uuid':server_id, 'tenant_id':req['tenant_id'], 'related':'instances','level':'panic',
1769 'description':'PANIC deleting server from host '+self.name+': '+last_error}
1770 )
1771 self.db_lock.release()
1772 if server_id in self.server_status:
1773 del self.server_status[server_id]
1774 return -1
1775 else:
1776 UPDATE['last_error'] = last_error
1777 if new_status != 'deleted' and (new_status != old_status or new_status == 'ERROR') :
1778 self.db_lock.acquire()
1779 self.db.update_rows('instances', UPDATE, {'uuid':server_id}, log=True)
1780 self.server_status[server_id] = new_status
1781 self.db_lock.release()
1782 if new_status == 'ERROR':
1783 return -1
1784 return 1
1785
1786
1787 def restore_iface(self, name, mac, lib_conn=None):
1788 ''' make an ifdown, ifup to restore default parameter of na interface
1789 Params:
1790 mac: mac address of the interface
1791 lib_conn: connection to the libvirt, if None a new connection is created
1792 Return 0,None if ok, -1,text if fails
1793 '''
1794 conn=None
1795 ret = 0
1796 error_text=None
1797 if self.test:
1798 print self.name, ": restore_iface '%s' %s" % (name, mac)
1799 return 0, None
1800 try:
1801 if not lib_conn:
1802 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1803 else:
1804 conn = lib_conn
1805
1806 #wait to the pending VM deletion
1807 #TODO.Revise self.server_forceoff(True)
1808
1809 iface = conn.interfaceLookupByMACString(mac)
1810 iface.destroy()
1811 iface.create()
1812 print self.name, ": restore_iface '%s' %s" % (name, mac)
1813 except host_thread.lvirt_module.libvirtError as e:
1814 error_text = e.get_error_message()
1815 print self.name, ": restore_iface '%s' '%s' libvirt exception: %s" %(name, mac, error_text)
1816 ret=-1
1817 finally:
1818 if lib_conn is None and conn is not None:
1819 conn.close()
1820 return ret, error_text
1821
1822
1823 def create_image(self,dom, req):
1824 if self.test:
1825 if 'path' in req['action']['createImage']:
1826 file_dst = req['action']['createImage']['path']
1827 else:
1828 createImage=req['action']['createImage']
1829 img_name= createImage['source']['path']
1830 index=img_name.rfind('/')
1831 file_dst = self.get_notused_path(img_name[:index+1] + createImage['name'] + '.qcow2')
1832 image_status='ACTIVE'
1833 else:
1834 for retry in (0,1):
1835 try:
1836 server_id = req['uuid']
1837 createImage=req['action']['createImage']
1838 file_orig = self.localinfo['server_files'][server_id] [ createImage['source']['image_id'] ] ['source file']
1839 if 'path' in req['action']['createImage']:
1840 file_dst = req['action']['createImage']['path']
1841 else:
1842 img_name= createImage['source']['path']
1843 index=img_name.rfind('/')
1844 file_dst = self.get_notused_filename(img_name[:index+1] + createImage['name'] + '.qcow2')
1845
1846 self.copy_file(file_orig, file_dst)
1847 qemu_info = self.qemu_get_info(file_orig)
1848 if 'backing file' in qemu_info:
1849 for k,v in self.localinfo['files'].items():
1850 if v==qemu_info['backing file']:
1851 self.qemu_change_backing(file_dst, k)
1852 break
1853 image_status='ACTIVE'
1854 break
1855 except paramiko.ssh_exception.SSHException as e:
1856 image_status='ERROR'
1857 error_text = e.args[0]
1858 print self.name, "': create_image(",server_id,") ssh Exception:", error_text
1859 if "SSH session not active" in error_text and retry==0:
1860 self.ssh_connect()
1861 except Exception as e:
1862 image_status='ERROR'
1863 error_text = str(e)
1864 print self.name, "': create_image(",server_id,") Exception:", error_text
1865
1866 #TODO insert a last_error at database
1867 self.db_lock.acquire()
1868 self.db.update_rows('images', {'status':image_status, 'progress': 100, 'path':file_dst},
1869 {'uuid':req['new_image']['uuid']}, log=True)
1870 self.db_lock.release()
1871
1872 def edit_iface(self, port_id, old_net, new_net):
1873 #This action imply remove and insert interface to put proper parameters
1874 if self.test:
1875 time.sleep(1)
1876 else:
1877 #get iface details
1878 self.db_lock.acquire()
1879 r,c = self.db.get_table(FROM='ports as p join resources_port as rp on p.uuid=rp.port_id',
1880 WHERE={'port_id': port_id})
1881 self.db_lock.release()
1882 if r<0:
1883 print self.name, ": edit_iface(",port_id,") DDBB error:", c
1884 return
1885 elif r==0:
1886 print self.name, ": edit_iface(",port_id,") por not found"
1887 return
1888 port=c[0]
1889 if port["model"]!="VF":
1890 print self.name, ": edit_iface(",port_id,") ERROR model must be VF"
1891 return
1892 #create xml detach file
1893 xml=[]
1894 self.xml_level = 2
1895 xml.append("<interface type='hostdev' managed='yes'>")
1896 xml.append(" <mac address='" +port['mac']+ "'/>")
1897 xml.append(" <source>"+ self.pci2xml(port['pci'])+"\n </source>")
1898 xml.append('</interface>')
1899
1900
1901 try:
1902 conn=None
1903 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1904 dom = conn.lookupByUUIDString(port["instance_id"])
1905 if old_net:
1906 text="\n".join(xml)
1907 print self.name, ": edit_iface detaching SRIOV interface", text
1908 dom.detachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
1909 if new_net:
1910 xml[-1] =" <vlan> <tag id='" + str(port['vlan']) + "'/> </vlan>"
1911 self.xml_level = 1
1912 xml.append(self.pci2xml(port.get('vpci',None)) )
1913 xml.append('</interface>')
1914 text="\n".join(xml)
1915 print self.name, ": edit_iface attaching SRIOV interface", text
1916 dom.attachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
1917
1918 except host_thread.lvirt_module.libvirtError as e:
1919 text = e.get_error_message()
1920 print self.name, ": edit_iface(",port["instance_id"],") libvirt exception:", text
1921
1922 finally:
1923 if conn is not None: conn.close()
1924
1925
1926 def create_server(server, db, db_lock, only_of_ports):
1927 #print "server"
1928 #print "server"
1929 #print server
1930 #print "server"
1931 #print "server"
1932 #try:
1933 # host_id = server.get('host_id', None)
1934 extended = server.get('extended', None)
1935
1936 # print '----------------------'
1937 # print json.dumps(extended, indent=4)
1938
1939 requirements={}
1940 requirements['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
1941 requirements['ram'] = server['flavor'].get('ram', 0)
1942 if requirements['ram']== None:
1943 requirements['ram'] = 0
1944 requirements['vcpus'] = server['flavor'].get('vcpus', 0)
1945 if requirements['vcpus']== None:
1946 requirements['vcpus'] = 0
1947 #If extended is not defined get requirements from flavor
1948 if extended is None:
1949 #If extended is defined in flavor convert to dictionary and use it
1950 if 'extended' in server['flavor'] and server['flavor']['extended'] != None:
1951 json_acceptable_string = server['flavor']['extended'].replace("'", "\"")
1952 extended = json.loads(json_acceptable_string)
1953 else:
1954 extended = None
1955 #print json.dumps(extended, indent=4)
1956
1957 #For simplicity only one numa VM are supported in the initial implementation
1958 if extended != None:
1959 numas = extended.get('numas', [])
1960 if len(numas)>1:
1961 return (-2, "Multi-NUMA VMs are not supported yet")
1962 #elif len(numas)<1:
1963 # return (-1, "At least one numa must be specified")
1964
1965 #a for loop is used in order to be ready to multi-NUMA VMs
1966 request = []
1967 for numa in numas:
1968 numa_req = {}
1969 numa_req['memory'] = numa.get('memory', 0)
1970 if 'cores' in numa:
1971 numa_req['proc_req_nb'] = numa['cores'] #number of cores or threads to be reserved
1972 numa_req['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
1973 numa_req['proc_req_list'] = numa.get('cores-id', None) #list of ids to be assigned to the cores or threads
1974 elif 'paired-threads' in numa:
1975 numa_req['proc_req_nb'] = numa['paired-threads']
1976 numa_req['proc_req_type'] = 'paired-threads'
1977 numa_req['proc_req_list'] = numa.get('paired-threads-id', None)
1978 elif 'threads' in numa:
1979 numa_req['proc_req_nb'] = numa['threads']
1980 numa_req['proc_req_type'] = 'threads'
1981 numa_req['proc_req_list'] = numa.get('threads-id', None)
1982 else:
1983 numa_req['proc_req_nb'] = 0 # by default
1984 numa_req['proc_req_type'] = 'threads'
1985
1986
1987
1988 #Generate a list of sriov and another for physical interfaces
1989 interfaces = numa.get('interfaces', [])
1990 sriov_list = []
1991 port_list = []
1992 for iface in interfaces:
1993 iface['bandwidth'] = int(iface['bandwidth'])
1994 if iface['dedicated'][:3]=='yes':
1995 port_list.append(iface)
1996 else:
1997 sriov_list.append(iface)
1998
1999 #Save lists ordered from more restrictive to less bw requirements
2000 numa_req['sriov_list'] = sorted(sriov_list, key=lambda k: k['bandwidth'], reverse=True)
2001 numa_req['port_list'] = sorted(port_list, key=lambda k: k['bandwidth'], reverse=True)
2002
2003
2004 request.append(numa_req)
2005
2006 # print "----------\n"+json.dumps(request[0], indent=4)
2007 # print '----------\n\n'
2008
2009 #Search in db for an appropriate numa for each requested numa
2010 #at the moment multi-NUMA VMs are not supported
2011 if len(request)>0:
2012 requirements['numa'].update(request[0])
2013 if requirements['numa']['memory']>0:
2014 requirements['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
2015 elif requirements['ram']==0:
2016 return (-1, "Memory information not set neither at extended field not at ram")
2017 if requirements['numa']['proc_req_nb']>0:
2018 requirements['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
2019 elif requirements['vcpus']==0:
2020 return (-1, "Processor information not set neither at extended field not at vcpus")
2021
2022
2023 db_lock.acquire()
2024 result, content = db.get_numas(requirements, server.get('host_id', None), only_of_ports)
2025 db_lock.release()
2026
2027 if result == -1:
2028 return (-1, content)
2029
2030 numa_id = content['numa_id']
2031 host_id = content['host_id']
2032
2033 #obtain threads_id and calculate pinning
2034 cpu_pinning = []
2035 reserved_threads=[]
2036 if requirements['numa']['proc_req_nb']>0:
2037 db_lock.acquire()
2038 result, content = db.get_table(FROM='resources_core',
2039 SELECT=('id','core_id','thread_id'),
2040 WHERE={'numa_id':numa_id,'instance_id': None, 'status':'ok'} )
2041 db_lock.release()
2042 if result <= 0:
2043 print content
2044 return -1, content
2045
2046 #convert rows to a dictionary indexed by core_id
2047 cores_dict = {}
2048 for row in content:
2049 if not row['core_id'] in cores_dict:
2050 cores_dict[row['core_id']] = []
2051 cores_dict[row['core_id']].append([row['thread_id'],row['id']])
2052
2053 #In case full cores are requested
2054 paired = 'N'
2055 if requirements['numa']['proc_req_type'] == 'cores':
2056 #Get/create the list of the vcpu_ids
2057 vcpu_id_list = requirements['numa']['proc_req_list']
2058 if vcpu_id_list == None:
2059 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2060
2061 for threads in cores_dict.itervalues():
2062 #we need full cores
2063 if len(threads) != 2:
2064 continue
2065
2066 #set pinning for the first thread
2067 cpu_pinning.append( [ vcpu_id_list.pop(0), threads[0][0], threads[0][1] ] )
2068
2069 #reserve so it is not used the second thread
2070 reserved_threads.append(threads[1][1])
2071
2072 if len(vcpu_id_list) == 0:
2073 break
2074
2075 #In case paired threads are requested
2076 elif requirements['numa']['proc_req_type'] == 'paired-threads':
2077 paired = 'Y'
2078 #Get/create the list of the vcpu_ids
2079 if requirements['numa']['proc_req_list'] != None:
2080 vcpu_id_list = []
2081 for pair in requirements['numa']['proc_req_list']:
2082 if len(pair)!=2:
2083 return -1, "Field paired-threads-id not properly specified"
2084 return
2085 vcpu_id_list.append(pair[0])
2086 vcpu_id_list.append(pair[1])
2087 else:
2088 vcpu_id_list = range(0,2*int(requirements['numa']['proc_req_nb']))
2089
2090 for threads in cores_dict.itervalues():
2091 #we need full cores
2092 if len(threads) != 2:
2093 continue
2094 #set pinning for the first thread
2095 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2096
2097 #set pinning for the second thread
2098 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2099
2100 if len(vcpu_id_list) == 0:
2101 break
2102
2103 #In case normal threads are requested
2104 elif requirements['numa']['proc_req_type'] == 'threads':
2105 #Get/create the list of the vcpu_ids
2106 vcpu_id_list = requirements['numa']['proc_req_list']
2107 if vcpu_id_list == None:
2108 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2109
2110 for threads_index in sorted(cores_dict, key=lambda k: len(cores_dict[k])):
2111 threads = cores_dict[threads_index]
2112 #set pinning for the first thread
2113 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2114
2115 #if exists, set pinning for the second thread
2116 if len(threads) == 2 and len(vcpu_id_list) != 0:
2117 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2118
2119 if len(vcpu_id_list) == 0:
2120 break
2121
2122 #Get the source pci addresses for the selected numa
2123 used_sriov_ports = []
2124 for port in requirements['numa']['sriov_list']:
2125 db_lock.acquire()
2126 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2127 db_lock.release()
2128 if result <= 0:
2129 print content
2130 return -1, content
2131 for row in content:
2132 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2133 continue
2134 port['pci'] = row['pci']
2135 if 'mac_address' not in port:
2136 port['mac_address'] = row['mac']
2137 del port['mac']
2138 port['port_id']=row['id']
2139 port['Mbps_used'] = port['bandwidth']
2140 used_sriov_ports.append(row['id'])
2141 break
2142
2143 for port in requirements['numa']['port_list']:
2144 port['Mbps_used'] = None
2145 if port['dedicated'] != "yes:sriov":
2146 port['mac_address'] = port['mac']
2147 del port['mac']
2148 continue
2149 db_lock.acquire()
2150 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac', 'Mbps'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2151 db_lock.release()
2152 if result <= 0:
2153 print content
2154 return -1, content
2155 port['Mbps_used'] = content[0]['Mbps']
2156 for row in content:
2157 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2158 continue
2159 port['pci'] = row['pci']
2160 if 'mac_address' not in port:
2161 port['mac_address'] = row['mac'] # mac cannot be set to passthrough ports
2162 del port['mac']
2163 port['port_id']=row['id']
2164 used_sriov_ports.append(row['id'])
2165 break
2166
2167 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
2168 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
2169
2170 server['host_id'] = host_id
2171
2172
2173 #Generate dictionary for saving in db the instance resources
2174 resources = {}
2175 resources['bridged-ifaces'] = []
2176
2177 numa_dict = {}
2178 numa_dict['interfaces'] = []
2179
2180 numa_dict['interfaces'] += requirements['numa']['port_list']
2181 numa_dict['interfaces'] += requirements['numa']['sriov_list']
2182
2183 #Check bridge information
2184 unified_dataplane_iface=[]
2185 unified_dataplane_iface += requirements['numa']['port_list']
2186 unified_dataplane_iface += requirements['numa']['sriov_list']
2187
2188 for control_iface in server.get('networks', []):
2189 control_iface['net_id']=control_iface.pop('uuid')
2190 #Get the brifge name
2191 db_lock.acquire()
2192 result, content = db.get_table(FROM='nets',
2193 SELECT=('name', 'type', 'vlan', 'provider', 'enable_dhcp',
2194 'dhcp_first_ip', 'dhcp_last_ip', 'cidr'),
2195 WHERE={'uuid': control_iface['net_id']})
2196 db_lock.release()
2197 if result < 0:
2198 pass
2199 elif result==0:
2200 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface['net_id']
2201 else:
2202 network=content[0]
2203 if control_iface.get("type", 'virtual') == 'virtual':
2204 if network['type']!='bridge_data' and network['type']!='bridge_man':
2205 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface['net_id']
2206 resources['bridged-ifaces'].append(control_iface)
2207 if network.get("provider") and network["provider"][0:3] == "OVS":
2208 control_iface["type"] = "instance:ovs"
2209 else:
2210 control_iface["type"] = "instance:bridge"
2211 if network.get("vlan"):
2212 control_iface["vlan"] = network["vlan"]
2213
2214 if network.get("enable_dhcp") == 'true':
2215 control_iface["enable_dhcp"] = network.get("enable_dhcp")
2216 control_iface["dhcp_first_ip"] = network["dhcp_first_ip"]
2217 control_iface["dhcp_last_ip"] = network["dhcp_last_ip"]
2218 control_iface["cidr"] = network["cidr"]
2219 else:
2220 if network['type']!='data' and network['type']!='ptp':
2221 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface['net_id']
2222 #dataplane interface, look for it in the numa tree and asign this network
2223 iface_found=False
2224 for dataplane_iface in numa_dict['interfaces']:
2225 if dataplane_iface['name'] == control_iface.get("name"):
2226 if (dataplane_iface['dedicated'] == "yes" and control_iface["type"] != "PF") or \
2227 (dataplane_iface['dedicated'] == "no" and control_iface["type"] != "VF") or \
2228 (dataplane_iface['dedicated'] == "yes:sriov" and control_iface["type"] != "VFnotShared") :
2229 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
2230 (control_iface.get("name"), dataplane_iface['dedicated'], control_iface["type"])
2231 dataplane_iface['uuid'] = control_iface['net_id']
2232 if dataplane_iface['dedicated'] == "no":
2233 dataplane_iface['vlan'] = network['vlan']
2234 if dataplane_iface['dedicated'] != "yes" and control_iface.get("mac_address"):
2235 dataplane_iface['mac_address'] = control_iface.get("mac_address")
2236 if control_iface.get("vpci"):
2237 dataplane_iface['vpci'] = control_iface.get("vpci")
2238 iface_found=True
2239 break
2240 if not iface_found:
2241 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface.get("name")
2242
2243 resources['host_id'] = host_id
2244 resources['image_id'] = server['image_id']
2245 resources['flavor_id'] = server['flavor_id']
2246 resources['tenant_id'] = server['tenant_id']
2247 resources['ram'] = requirements['ram']
2248 resources['vcpus'] = requirements['vcpus']
2249 resources['status'] = 'CREATING'
2250
2251 if 'description' in server: resources['description'] = server['description']
2252 if 'name' in server: resources['name'] = server['name']
2253
2254 resources['extended'] = {} #optional
2255 resources['extended']['numas'] = []
2256 numa_dict['numa_id'] = numa_id
2257 numa_dict['memory'] = requirements['numa']['memory']
2258 numa_dict['cores'] = []
2259
2260 for core in cpu_pinning:
2261 numa_dict['cores'].append({'id': core[2], 'vthread': core[0], 'paired': paired})
2262 for core in reserved_threads:
2263 numa_dict['cores'].append({'id': core})
2264 resources['extended']['numas'].append(numa_dict)
2265 if extended!=None and 'devices' in extended: #TODO allow extra devices without numa
2266 resources['extended']['devices'] = extended['devices']
2267
2268
2269 print '===================================={'
2270 print json.dumps(resources, indent=4)
2271 print '====================================}'
2272
2273 return 0, resources
2274