bf7a6da09c2b3816e59f4b07ac14a6951990f914
[osm/openvim.git] / osm_openvim / host_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 '''
28 __author__ = "Pablo Montes, Alfonso Tierno, Leonardo Mirabal"
29 __date__ = "$10-jul-2014 12:07:15$"
30
31 import json
32 import yaml
33 import threading
34 import time
35 import Queue
36 import paramiko
37 from jsonschema import validate as js_v, exceptions as js_e
38 #import libvirt
39 import imp
40 from vim_schema import localinfo_schema, hostinfo_schema
41 import random
42 import os
43 import logging
44
45
46 class host_thread(threading.Thread):
47 lvirt_module = None
48
49 def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode,
50 develop_bridge_iface, logger_name=None, debug=None):
51 '''Init a thread.
52 Arguments:
53 'id' number of thead
54 'name' name of thread
55 'host','user': host ip or name to manage and user
56 'db', 'db_lock': database class and lock to use it in exclusion
57 '''
58 threading.Thread.__init__(self)
59 self.name = name
60 self.host = host
61 self.user = user
62 self.db = db
63 self.db_lock = db_lock
64 self.test = test
65 self.localinfo_dirty = False
66
67 if not test and not host_thread.lvirt_module:
68 try:
69 module_info = imp.find_module("libvirt")
70 host_thread.lvirt_module = imp.load_module("libvirt", *module_info)
71 except (IOError, ImportError) as e:
72 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e))
73 if logger_name:
74 self.logger_name = logger_name
75 else:
76 self.logger_name = "openvim.host."+name
77 self.logger = logging.getLogger(self.logger_name)
78 if debug:
79 self.logger.setLevel(getattr(logging, debug))
80
81
82 self.develop_mode = develop_mode
83 self.develop_bridge_iface = develop_bridge_iface
84 self.image_path = image_path
85 self.host_id = host_id
86 self.version = version
87
88 self.xml_level = 0
89 #self.pending ={}
90
91 self.server_status = {} #dictionary with pairs server_uuid:server_status
92 self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
93 self.next_update_server_status = 0 #time when must be check servers status
94
95 self.hostinfo = None
96
97 self.queueLock = threading.Lock()
98 self.taskQueue = Queue.Queue(2000)
99 self.ssh_conn = None
100
101 def ssh_connect(self):
102 try:
103 #Connect SSH
104 self.ssh_conn = paramiko.SSHClient()
105 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
106 self.ssh_conn.load_system_host_keys()
107 self.ssh_conn.connect(self.host, username=self.user, timeout=10) #, None)
108 except paramiko.ssh_exception.SSHException as e:
109 text = e.args[0]
110 self.logger.error("ssh_connect ssh Exception: " + text)
111
112 def load_localinfo(self):
113 if not self.test:
114 try:
115 #Connect SSH
116 self.ssh_connect()
117
118 command = 'mkdir -p ' + self.image_path
119 #print self.name, ': command:', command
120 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
121 content = stderr.read()
122 if len(content) > 0:
123 self.logger.error("command: '%s' stderr: '%s'", command, content)
124
125 command = 'cat ' + self.image_path + '/.openvim.yaml'
126 #print self.name, ': command:', command
127 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
128 content = stdout.read()
129 if len(content) == 0:
130 self.logger.error("command: '%s' stderr='%s'", command, stderr.read())
131 raise paramiko.ssh_exception.SSHException("Error empty file, command: '{}'".format(command))
132 self.localinfo = yaml.load(content)
133 js_v(self.localinfo, localinfo_schema)
134 self.localinfo_dirty = False
135 if 'server_files' not in self.localinfo:
136 self.localinfo['server_files'] = {}
137 self.logger.debug("localinfo load from host")
138 return
139
140 except paramiko.ssh_exception.SSHException as e:
141 text = e.args[0]
142 self.logger.error("load_localinfo ssh Exception: " + text)
143 except host_thread.lvirt_module.libvirtError as e:
144 text = e.get_error_message()
145 self.logger.error("load_localinfo libvirt Exception: " + text)
146 except yaml.YAMLError as exc:
147 text = ""
148 if hasattr(exc, 'problem_mark'):
149 mark = exc.problem_mark
150 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
151 self.logger.error("load_localinfo yaml format Exception " + text)
152 except js_e.ValidationError as e:
153 text = ""
154 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
155 self.logger.error("load_localinfo format Exception: %s %s", text, str(e))
156 except Exception as e:
157 text = str(e)
158 self.logger.error("load_localinfo Exception: " + text)
159
160 #not loaded, insert a default data and force saving by activating dirty flag
161 self.localinfo = {'files':{}, 'server_files':{} }
162 #self.localinfo_dirty=True
163 self.localinfo_dirty=False
164
165 def load_hostinfo(self):
166 if self.test:
167 return;
168 try:
169 #Connect SSH
170 self.ssh_connect()
171
172
173 command = 'cat ' + self.image_path + '/hostinfo.yaml'
174 #print self.name, ': command:', command
175 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
176 content = stdout.read()
177 if len(content) == 0:
178 self.logger.error("command: '%s' stderr: '%s'", command, stderr.read())
179 raise paramiko.ssh_exception.SSHException("Error empty file ")
180 self.hostinfo = yaml.load(content)
181 js_v(self.hostinfo, hostinfo_schema)
182 self.logger.debug("hostlinfo load from host " + str(self.hostinfo))
183 return
184
185 except paramiko.ssh_exception.SSHException as e:
186 text = e.args[0]
187 self.logger.error("load_hostinfo ssh Exception: " + text)
188 except host_thread.lvirt_module.libvirtError as e:
189 text = e.get_error_message()
190 self.logger.error("load_hostinfo libvirt Exception: " + text)
191 except yaml.YAMLError as exc:
192 text = ""
193 if hasattr(exc, 'problem_mark'):
194 mark = exc.problem_mark
195 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
196 self.logger.error("load_hostinfo yaml format Exception " + text)
197 except js_e.ValidationError as e:
198 text = ""
199 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
200 self.logger.error("load_hostinfo format Exception: %s %s", text, e.message)
201 except Exception as e:
202 text = str(e)
203 self.logger.error("load_hostinfo Exception: " + text)
204
205 #not loaded, insert a default data
206 self.hostinfo = None
207
208 def save_localinfo(self, tries=3):
209 if self.test:
210 self.localinfo_dirty = False
211 return
212
213 while tries>=0:
214 tries-=1
215
216 try:
217 command = 'cat > ' + self.image_path + '/.openvim.yaml'
218 self.logger.debug("command:" + command)
219 (stdin, _, _) = self.ssh_conn.exec_command(command)
220 yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True)
221 self.localinfo_dirty = False
222 break #while tries
223
224 except paramiko.ssh_exception.SSHException as e:
225 text = e.args[0]
226 self.logger.error("save_localinfo ssh Exception: " + text)
227 if "SSH session not active" in text:
228 self.ssh_connect()
229 except host_thread.lvirt_module.libvirtError as e:
230 text = e.get_error_message()
231 self.logger.error("save_localinfo libvirt Exception: " + text)
232 except yaml.YAMLError as exc:
233 text = ""
234 if hasattr(exc, 'problem_mark'):
235 mark = exc.problem_mark
236 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
237 self.logger.error("save_localinfo yaml format Exception " + text)
238 except Exception as e:
239 text = str(e)
240 self.logger.error("save_localinfo Exception: " + text)
241
242 def load_servers_from_db(self):
243 self.db_lock.acquire()
244 r,c = self.db.get_table(SELECT=('uuid','status', 'image_id'), FROM='instances', WHERE={'host_id': self.host_id})
245 self.db_lock.release()
246
247 self.server_status = {}
248 if r<0:
249 self.logger.error("Error getting data from database: " + c)
250 return
251 for server in c:
252 self.server_status[ server['uuid'] ] = server['status']
253
254 #convert from old version to new one
255 if 'inc_files' in self.localinfo and server['uuid'] in self.localinfo['inc_files']:
256 server_files_dict = {'source file': self.localinfo['inc_files'][ server['uuid'] ] [0], 'file format':'raw' }
257 if server_files_dict['source file'][-5:] == 'qcow2':
258 server_files_dict['file format'] = 'qcow2'
259
260 self.localinfo['server_files'][ server['uuid'] ] = { server['image_id'] : server_files_dict }
261 if 'inc_files' in self.localinfo:
262 del self.localinfo['inc_files']
263 self.localinfo_dirty = True
264
265 def delete_unused_files(self):
266 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
267 Deletes unused entries at self.loacalinfo and the corresponding local files.
268 The only reason for this mismatch is the manual deletion of instances (VM) at database
269 '''
270 if self.test:
271 return
272 for uuid,images in self.localinfo['server_files'].items():
273 if uuid not in self.server_status:
274 for localfile in images.values():
275 try:
276 self.logger.debug("deleting file '%s' of unused server '%s'", localfile['source file'], uuid)
277 self.delete_file(localfile['source file'])
278 except paramiko.ssh_exception.SSHException as e:
279 self.logger.error("Exception deleting file '%s': %s", localfile['source file'], str(e))
280 del self.localinfo['server_files'][uuid]
281 self.localinfo_dirty = True
282
283 def insert_task(self, task, *aditional):
284 try:
285 self.queueLock.acquire()
286 task = self.taskQueue.put( (task,) + aditional, timeout=5)
287 self.queueLock.release()
288 return 1, None
289 except Queue.Full:
290 return -1, "timeout inserting a task over host " + self.name
291
292 def run(self):
293 while True:
294 self.load_localinfo()
295 self.load_hostinfo()
296 self.load_servers_from_db()
297 self.delete_unused_files()
298 while True:
299 try:
300 self.queueLock.acquire()
301 if not self.taskQueue.empty():
302 task = self.taskQueue.get()
303 else:
304 task = None
305 self.queueLock.release()
306
307 if task is None:
308 now=time.time()
309 if self.localinfo_dirty:
310 self.save_localinfo()
311 elif self.next_update_server_status < now:
312 self.update_servers_status()
313 self.next_update_server_status = now + 5
314 elif len(self.pending_terminate_server)>0 and self.pending_terminate_server[0][0]<now:
315 self.server_forceoff()
316 else:
317 time.sleep(1)
318 continue
319
320 if task[0] == 'instance':
321 self.logger.debug("processing task instance " + str(task[1]['action']))
322 retry = 0
323 while retry < 2:
324 retry += 1
325 r = self.action_on_server(task[1], retry==2)
326 if r >= 0:
327 break
328 elif task[0] == 'image':
329 pass
330 elif task[0] == 'exit':
331 self.logger.debug("processing task exit")
332 self.terminate()
333 return 0
334 elif task[0] == 'reload':
335 self.logger.debug("processing task reload terminating and relaunching")
336 self.terminate()
337 break
338 elif task[0] == 'edit-iface':
339 self.logger.debug("processing task edit-iface port={}, old_net={}, new_net={}".format(
340 task[1], task[2], task[3]))
341 self.edit_iface(task[1], task[2], task[3])
342 elif task[0] == 'restore-iface':
343 self.logger.debug("processing task restore-iface={} mac={}".format(task[1], task[2]))
344 self.restore_iface(task[1], task[2])
345 elif task[0] == 'new-ovsbridge':
346 self.logger.debug("Creating compute OVS bridge")
347 self.create_ovs_bridge()
348 elif task[0] == 'new-vxlan':
349 self.logger.debug("Creating vxlan tunnel='{}', remote ip='{}'".format(task[1], task[2]))
350 self.create_ovs_vxlan_tunnel(task[1], task[2])
351 elif task[0] == 'del-ovsbridge':
352 self.logger.debug("Deleting OVS bridge")
353 self.delete_ovs_bridge()
354 elif task[0] == 'del-vxlan':
355 self.logger.debug("Deleting vxlan {} tunnel".format(task[1]))
356 self.delete_ovs_vxlan_tunnel(task[1])
357 elif task[0] == 'create-ovs-bridge-port':
358 self.logger.debug("Adding port ovim-{} to OVS bridge".format(task[1]))
359 self.create_ovs_bridge_port(task[1])
360 elif task[0] == 'del-ovs-port':
361 self.logger.debug("Delete bridge attached to ovs port vlan {} net {}".format(task[1], task[2]))
362 self.delete_bridge_port_attached_to_ovs(task[1], task[2])
363 else:
364 self.logger.debug("unknown task " + str(task))
365
366 except Exception as e:
367 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
368
369 def server_forceoff(self, wait_until_finished=False):
370 while len(self.pending_terminate_server)>0:
371 now = time.time()
372 if self.pending_terminate_server[0][0]>now:
373 if wait_until_finished:
374 time.sleep(1)
375 continue
376 else:
377 return
378 req={'uuid':self.pending_terminate_server[0][1],
379 'action':{'terminate':'force'},
380 'status': None
381 }
382 self.action_on_server(req)
383 self.pending_terminate_server.pop(0)
384
385 def terminate(self):
386 try:
387 self.server_forceoff(True)
388 if self.localinfo_dirty:
389 self.save_localinfo()
390 if not self.test:
391 self.ssh_conn.close()
392 except Exception as e:
393 text = str(e)
394 self.logger.error("terminate Exception: " + text)
395 self.logger.debug("exit from host_thread")
396
397 def get_local_iface_name(self, generic_name):
398 if self.hostinfo != None and "iface_names" in self.hostinfo and generic_name in self.hostinfo["iface_names"]:
399 return self.hostinfo["iface_names"][generic_name]
400 return generic_name
401
402 def create_xml_server(self, server, dev_list, server_metadata={}):
403 """Function that implements the generation of the VM XML definition.
404 Additional devices are in dev_list list
405 The main disk is upon dev_list[0]"""
406
407 #get if operating system is Windows
408 windows_os = False
409 os_type = server_metadata.get('os_type', None)
410 if os_type == None and 'metadata' in dev_list[0]:
411 os_type = dev_list[0]['metadata'].get('os_type', None)
412 if os_type != None and os_type.lower() == "windows":
413 windows_os = True
414 #get type of hard disk bus
415 bus_ide = True if windows_os else False
416 bus = server_metadata.get('bus', None)
417 if bus == None and 'metadata' in dev_list[0]:
418 bus = dev_list[0]['metadata'].get('bus', None)
419 if bus != None:
420 bus_ide = True if bus=='ide' else False
421
422 self.xml_level = 0
423
424 text = "<domain type='kvm'>"
425 #get topology
426 topo = server_metadata.get('topology', None)
427 if topo == None and 'metadata' in dev_list[0]:
428 topo = dev_list[0]['metadata'].get('topology', None)
429 #name
430 name = server.get('name','') + "_" + server['uuid']
431 name = name[:58] #qemu impose a length limit of 59 chars or not start. Using 58
432 text += self.inc_tab() + "<name>" + name+ "</name>"
433 #uuid
434 text += self.tab() + "<uuid>" + server['uuid'] + "</uuid>"
435
436 numa={}
437 if 'extended' in server and server['extended']!=None and 'numas' in server['extended']:
438 numa = server['extended']['numas'][0]
439 #memory
440 use_huge = False
441 memory = int(numa.get('memory',0))*1024*1024 #in KiB
442 if memory==0:
443 memory = int(server['ram'])*1024;
444 else:
445 if not self.develop_mode:
446 use_huge = True
447 if memory==0:
448 return -1, 'No memory assigned to instance'
449 memory = str(memory)
450 text += self.tab() + "<memory unit='KiB'>" +memory+"</memory>"
451 text += self.tab() + "<currentMemory unit='KiB'>" +memory+ "</currentMemory>"
452 if use_huge:
453 text += self.tab()+'<memoryBacking>'+ \
454 self.inc_tab() + '<hugepages/>'+ \
455 self.dec_tab()+ '</memoryBacking>'
456
457 #cpu
458 use_cpu_pinning=False
459 vcpus = int(server.get("vcpus",0))
460 cpu_pinning = []
461 if 'cores-source' in numa:
462 use_cpu_pinning=True
463 for index in range(0, len(numa['cores-source'])):
464 cpu_pinning.append( [ numa['cores-id'][index], numa['cores-source'][index] ] )
465 vcpus += 1
466 if 'threads-source' in numa:
467 use_cpu_pinning=True
468 for index in range(0, len(numa['threads-source'])):
469 cpu_pinning.append( [ numa['threads-id'][index], numa['threads-source'][index] ] )
470 vcpus += 1
471 if 'paired-threads-source' in numa:
472 use_cpu_pinning=True
473 for index in range(0, len(numa['paired-threads-source'])):
474 cpu_pinning.append( [numa['paired-threads-id'][index][0], numa['paired-threads-source'][index][0] ] )
475 cpu_pinning.append( [numa['paired-threads-id'][index][1], numa['paired-threads-source'][index][1] ] )
476 vcpus += 2
477
478 if use_cpu_pinning and not self.develop_mode:
479 text += self.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning)) +"</vcpu>" + \
480 self.tab()+'<cputune>'
481 self.xml_level += 1
482 for i in range(0, len(cpu_pinning)):
483 text += self.tab() + "<vcpupin vcpu='" +str(cpu_pinning[i][0])+ "' cpuset='" +str(cpu_pinning[i][1]) +"'/>"
484 text += self.dec_tab()+'</cputune>'+ \
485 self.tab() + '<numatune>' +\
486 self.inc_tab() + "<memory mode='strict' nodeset='" +str(numa['source'])+ "'/>" +\
487 self.dec_tab() + '</numatune>'
488 else:
489 if vcpus==0:
490 return -1, "Instance without number of cpus"
491 text += self.tab()+"<vcpu>" + str(vcpus) + "</vcpu>"
492
493 #boot
494 boot_cdrom = False
495 for dev in dev_list:
496 if dev['type']=='cdrom' :
497 boot_cdrom = True
498 break
499 text += self.tab()+ '<os>' + \
500 self.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
501 if boot_cdrom:
502 text += self.tab() + "<boot dev='cdrom'/>"
503 text += self.tab() + "<boot dev='hd'/>" + \
504 self.dec_tab()+'</os>'
505 #features
506 text += self.tab()+'<features>'+\
507 self.inc_tab()+'<acpi/>' +\
508 self.tab()+'<apic/>' +\
509 self.tab()+'<pae/>'+ \
510 self.dec_tab() +'</features>'
511 if topo == "oneSocket:hyperthreading":
512 if vcpus % 2 != 0:
513 return -1, 'Cannot expose hyperthreading with an odd number of vcpus'
514 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='2' /> </cpu>" % vcpus/2
515 elif windows_os or topo == "oneSocket":
516 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>" % vcpus
517 else:
518 text += self.tab() + "<cpu mode='host-model'></cpu>"
519 text += self.tab() + "<clock offset='utc'/>" +\
520 self.tab() + "<on_poweroff>preserve</on_poweroff>" + \
521 self.tab() + "<on_reboot>restart</on_reboot>" + \
522 self.tab() + "<on_crash>restart</on_crash>"
523 text += self.tab() + "<devices>" + \
524 self.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
525 self.tab() + "<serial type='pty'>" +\
526 self.inc_tab() + "<target port='0'/>" + \
527 self.dec_tab() + "</serial>" +\
528 self.tab() + "<console type='pty'>" + \
529 self.inc_tab()+ "<target type='serial' port='0'/>" + \
530 self.dec_tab()+'</console>'
531 if windows_os:
532 text += self.tab() + "<controller type='usb' index='0'/>" + \
533 self.tab() + "<controller type='ide' index='0'/>" + \
534 self.tab() + "<input type='mouse' bus='ps2'/>" + \
535 self.tab() + "<sound model='ich6'/>" + \
536 self.tab() + "<video>" + \
537 self.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
538 self.dec_tab() + "</video>" + \
539 self.tab() + "<memballoon model='virtio'/>" + \
540 self.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
541
542 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
543 #> self.dec_tab()+'</hostdev>\n' +\
544 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
545 if windows_os:
546 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
547 else:
548 #If image contains 'GRAPH' include graphics
549 #if 'GRAPH' in image:
550 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
551 self.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
552 self.dec_tab() + "</graphics>"
553
554 vd_index = 'a'
555 for dev in dev_list:
556 bus_ide_dev = bus_ide
557 if dev['type']=='cdrom' or dev['type']=='disk':
558 if dev['type']=='cdrom':
559 bus_ide_dev = True
560 text += self.tab() + "<disk type='file' device='"+dev['type']+"'>"
561 if 'file format' in dev:
562 text += self.inc_tab() + "<driver name='qemu' type='" +dev['file format']+ "' cache='writethrough'/>"
563 if 'source file' in dev:
564 text += self.tab() + "<source file='" +dev['source file']+ "'/>"
565 #elif v['type'] == 'block':
566 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
567 #else:
568 # return -1, 'Unknown disk type ' + v['type']
569 vpci = dev.get('vpci',None)
570 if vpci == None:
571 vpci = dev['metadata'].get('vpci',None)
572 text += self.pci2xml(vpci)
573
574 if bus_ide_dev:
575 text += self.tab() + "<target dev='hd" +vd_index+ "' bus='ide'/>" #TODO allows several type of disks
576 else:
577 text += self.tab() + "<target dev='vd" +vd_index+ "' bus='virtio'/>"
578 text += self.dec_tab() + '</disk>'
579 vd_index = chr(ord(vd_index)+1)
580 elif dev['type']=='xml':
581 dev_text = dev['xml']
582 if 'vpci' in dev:
583 dev_text = dev_text.replace('__vpci__', dev['vpci'])
584 if 'source file' in dev:
585 dev_text = dev_text.replace('__file__', dev['source file'])
586 if 'file format' in dev:
587 dev_text = dev_text.replace('__format__', dev['source file'])
588 if '__dev__' in dev_text:
589 dev_text = dev_text.replace('__dev__', vd_index)
590 vd_index = chr(ord(vd_index)+1)
591 text += dev_text
592 else:
593 return -1, 'Unknown device type ' + dev['type']
594
595 net_nb=0
596 bridge_interfaces = server.get('networks', [])
597 for v in bridge_interfaces:
598 #Get the brifge name
599 self.db_lock.acquire()
600 result, content = self.db.get_table(FROM='nets', SELECT=('provider',),WHERE={'uuid':v['net_id']} )
601 self.db_lock.release()
602 if result <= 0:
603 self.logger.error("create_xml_server ERROR %d getting nets %s", result, content)
604 return -1, content
605 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
606 #I know it is not secure
607 #for v in sorted(desc['network interfaces'].itervalues()):
608 model = v.get("model", None)
609 if content[0]['provider']=='default':
610 text += self.tab() + "<interface type='network'>" + \
611 self.inc_tab() + "<source network='" +content[0]['provider']+ "'/>"
612 elif content[0]['provider'][0:7]=='macvtap':
613 text += self.tab()+"<interface type='direct'>" + \
614 self.inc_tab() + "<source dev='" + self.get_local_iface_name(content[0]['provider'][8:]) + "' mode='bridge'/>" + \
615 self.tab() + "<target dev='macvtap0'/>"
616 if windows_os:
617 text += self.tab() + "<alias name='net" + str(net_nb) + "'/>"
618 elif model==None:
619 model = "virtio"
620 elif content[0]['provider'][0:6]=='bridge':
621 text += self.tab() + "<interface type='bridge'>" + \
622 self.inc_tab()+"<source bridge='" +self.get_local_iface_name(content[0]['provider'][7:])+ "'/>"
623 if windows_os:
624 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
625 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
626 elif model==None:
627 model = "virtio"
628 elif content[0]['provider'][0:3] == "OVS":
629 vlan = content[0]['provider'].replace('OVS:', '')
630 text += self.tab() + "<interface type='bridge'>" + \
631 self.inc_tab() + "<source bridge='ovim-" + str(vlan) + "'/>"
632 else:
633 return -1, 'Unknown Bridge net provider ' + content[0]['provider']
634 if model!=None:
635 text += self.tab() + "<model type='" +model+ "'/>"
636 if v.get('mac_address', None) != None:
637 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
638 text += self.pci2xml(v.get('vpci',None))
639 text += self.dec_tab()+'</interface>'
640
641 net_nb += 1
642
643 interfaces = numa.get('interfaces', [])
644
645 net_nb=0
646 for v in interfaces:
647 if self.develop_mode: #map these interfaces to bridges
648 text += self.tab() + "<interface type='bridge'>" + \
649 self.inc_tab()+"<source bridge='" +self.develop_bridge_iface+ "'/>"
650 if windows_os:
651 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
652 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
653 else:
654 text += self.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
655 if v.get('mac_address', None) != None:
656 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
657 text += self.pci2xml(v.get('vpci',None))
658 text += self.dec_tab()+'</interface>'
659 continue
660
661 if v['dedicated'] == 'yes': #passthrought
662 text += self.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
663 self.inc_tab() + "<source>"
664 self.inc_tab()
665 text += self.pci2xml(v['source'])
666 text += self.dec_tab()+'</source>'
667 text += self.pci2xml(v.get('vpci',None))
668 if windows_os:
669 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
670 text += self.dec_tab()+'</hostdev>'
671 net_nb += 1
672 else: #sriov_interfaces
673 #skip not connected interfaces
674 if v.get("net_id") == None:
675 continue
676 text += self.tab() + "<interface type='hostdev' managed='yes'>"
677 self.inc_tab()
678 if v.get('mac_address', None) != None:
679 text+= self.tab() + "<mac address='" +v['mac_address']+ "'/>"
680 text+= self.tab()+'<source>'
681 self.inc_tab()
682 text += self.pci2xml(v['source'])
683 text += self.dec_tab()+'</source>'
684 if v.get('vlan',None) != None:
685 text += self.tab() + "<vlan> <tag id='" + str(v['vlan']) + "'/> </vlan>"
686 text += self.pci2xml(v.get('vpci',None))
687 if windows_os:
688 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
689 text += self.dec_tab()+'</interface>'
690
691
692 text += self.dec_tab()+'</devices>'+\
693 self.dec_tab()+'</domain>'
694 return 0, text
695
696 def pci2xml(self, pci):
697 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
698 alows an empty pci text'''
699 if pci is None:
700 return ""
701 first_part = pci.split(':')
702 second_part = first_part[2].split('.')
703 return self.tab() + "<address type='pci' domain='0x" + first_part[0] + \
704 "' bus='0x" + first_part[1] + "' slot='0x" + second_part[0] + \
705 "' function='0x" + second_part[1] + "'/>"
706
707 def tab(self):
708 """Return indentation according to xml_level"""
709 return "\n" + (' '*self.xml_level)
710
711 def inc_tab(self):
712 """Increment and return indentation according to xml_level"""
713 self.xml_level += 1
714 return self.tab()
715
716 def dec_tab(self):
717 """Decrement and return indentation according to xml_level"""
718 self.xml_level -= 1
719 return self.tab()
720
721 def create_ovs_bridge(self):
722 """
723 Create a bridge in compute OVS to allocate VMs
724 :return: True if success
725 """
726 if self.test:
727 return True
728 try:
729 command = 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
730 self.logger.debug("command: " + command)
731 (_, stdout, _) = self.ssh_conn.exec_command(command)
732 content = stdout.read()
733 if len(content) == 0:
734 return True
735 else:
736 return False
737 except paramiko.ssh_exception.SSHException as e:
738 self.logger.error("create_ovs_bridge ssh Exception: " + str(e))
739 if "SSH session not active" in str(e):
740 self.ssh_connect()
741 return False
742
743 def delete_port_to_ovs_bridge(self, vlan, net_uuid):
744 """
745 Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed
746 :param vlan: vlan port id
747 :param net_uuid: network id
748 :return:
749 """
750
751 if self.test:
752 return True
753 try:
754 port_name = 'ovim-' + str(vlan)
755 command = 'sudo ovs-vsctl del-port br-int ' + port_name
756 self.logger.debug("command: " + command)
757 (_, stdout, _) = self.ssh_conn.exec_command(command)
758 content = stdout.read()
759 if len(content) == 0:
760 return True
761 else:
762 return False
763 except paramiko.ssh_exception.SSHException as e:
764 self.logger.error("delete_port_to_ovs_bridge ssh Exception: " + str(e))
765 if "SSH session not active" in str(e):
766 self.ssh_connect()
767 return False
768
769 def delete_dhcp_server(self, vlan, net_uuid, dhcp_path):
770 """
771 Delete dhcp server process lining in namespace
772 :param vlan: segmentation id
773 :param net_uuid: network uuid
774 :param dhcp_path: conf fiel path that live in namespace side
775 :return:
776 """
777 if self.test:
778 return True
779 if not self.is_dhcp_port_free(vlan, net_uuid):
780 return True
781 try:
782 net_namespace = 'ovim-' + str(vlan)
783 dhcp_path = os.path.join(dhcp_path, net_namespace)
784 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
785
786 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_file
787 self.logger.debug("command: " + command)
788 (_, stdout, _) = self.ssh_conn.exec_command(command)
789 content = stdout.read()
790
791 command = 'sudo ip netns exec ' + net_namespace + ' kill -9 ' + content
792 self.logger.debug("command: " + command)
793 (_, stdout, _) = self.ssh_conn.exec_command(command)
794 content = stdout.read()
795
796 # if len(content) == 0:
797 # return True
798 # else:
799 # return False
800 except paramiko.ssh_exception.SSHException as e:
801 self.logger.error("delete_dhcp_server ssh Exception: " + str(e))
802 if "SSH session not active" in str(e):
803 self.ssh_connect()
804 return False
805
806 def is_dhcp_port_free(self, host_id, net_uuid):
807 """
808 Check if any port attached to the a net in a vxlan mesh across computes nodes
809 :param host_id: host id
810 :param net_uuid: network id
811 :return: True if is not free
812 """
813 self.db_lock.acquire()
814 result, content = self.db.get_table(
815 FROM='ports',
816 WHERE={'type': 'instance:ovs', 'net_id': net_uuid}
817 )
818 self.db_lock.release()
819
820 if len(content) > 0:
821 return False
822 else:
823 return True
824
825 def is_port_free(self, host_id, net_uuid):
826 """
827 Check if there not ovs ports of a network in a compute host.
828 :param host_id: host id
829 :param net_uuid: network id
830 :return: True if is not free
831 """
832
833 self.db_lock.acquire()
834 result, content = self.db.get_table(
835 FROM='ports as p join instances as i on p.instance_id=i.uuid',
836 WHERE={"i.host_id": self.host_id, 'p.type': 'instance:ovs', 'p.net_id': net_uuid}
837 )
838 self.db_lock.release()
839
840 if len(content) > 0:
841 return False
842 else:
843 return True
844
845 def add_port_to_ovs_bridge(self, vlan):
846 """
847 Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge
848 :param vlan: vlan port id
849 :return: True if success
850 """
851
852 if self.test:
853 return True
854 try:
855 port_name = 'ovim-' + str(vlan)
856 command = 'sudo ovs-vsctl add-port br-int ' + port_name + ' tag=' + str(vlan)
857 self.logger.debug("command: " + command)
858 (_, stdout, _) = self.ssh_conn.exec_command(command)
859 content = stdout.read()
860 if len(content) == 0:
861 return True
862 else:
863 return False
864 except paramiko.ssh_exception.SSHException as e:
865 self.logger.error("add_port_to_ovs_bridge ssh Exception: " + str(e))
866 if "SSH session not active" in str(e):
867 self.ssh_connect()
868 return False
869
870 def delete_dhcp_port(self, vlan, net_uuid):
871 """
872 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
873 :param vlan: segmentation id
874 :param net_uuid: network id
875 :return: True if success
876 """
877
878 if self.test:
879 return True
880
881 if not self.is_dhcp_port_free(vlan, net_uuid):
882 return True
883 self.delete_dhcp_interfaces(vlan)
884 return True
885
886 def delete_bridge_port_attached_to_ovs(self, vlan, net_uuid):
887 """
888 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
889 :param vlan:
890 :param net_uuid:
891 :return: True if success
892 """
893 if self.test:
894 return
895
896 if not self.is_port_free(vlan, net_uuid):
897 return True
898 self.delete_port_to_ovs_bridge(vlan, net_uuid)
899 self.delete_linux_bridge(vlan)
900 return True
901
902 def delete_linux_bridge(self, vlan):
903 """
904 Delete a linux bridge in a scpecific compute.
905 :param vlan: vlan port id
906 :return: True if success
907 """
908
909 if self.test:
910 return True
911 try:
912 port_name = 'ovim-' + str(vlan)
913 command = 'sudo ip link set dev veth0-' + str(vlan) + ' down'
914 self.logger.debug("command: " + command)
915 (_, stdout, _) = self.ssh_conn.exec_command(command)
916 # content = stdout.read()
917 #
918 # if len(content) != 0:
919 # return False
920 command = 'sudo ifconfig ' + port_name + ' down && sudo brctl delbr ' + port_name
921 self.logger.debug("command: " + command)
922 (_, stdout, _) = self.ssh_conn.exec_command(command)
923 content = stdout.read()
924 if len(content) == 0:
925 return True
926 else:
927 return False
928 except paramiko.ssh_exception.SSHException as e:
929 self.logger.error("delete_linux_bridge ssh Exception: " + str(e))
930 if "SSH session not active" in str(e):
931 self.ssh_connect()
932 return False
933
934 def create_ovs_bridge_port(self, vlan):
935 """
936 Generate a linux bridge and attache the port to a OVS bridge
937 :param vlan: vlan port id
938 :return:
939 """
940 if self.test:
941 return
942 self.create_linux_bridge(vlan)
943 self.add_port_to_ovs_bridge(vlan)
944
945 def create_linux_bridge(self, vlan):
946 """
947 Create a linux bridge with STP active
948 :param vlan: netowrk vlan id
949 :return:
950 """
951
952 if self.test:
953 return True
954 try:
955 port_name = 'ovim-' + str(vlan)
956 command = 'sudo brctl show | grep ' + port_name
957 self.logger.debug("command: " + command)
958 (_, stdout, _) = self.ssh_conn.exec_command(command)
959 content = stdout.read()
960
961 # if exist nothing to create
962 # if len(content) == 0:
963 # return False
964
965 command = 'sudo brctl addbr ' + port_name
966 self.logger.debug("command: " + command)
967 (_, stdout, _) = self.ssh_conn.exec_command(command)
968 content = stdout.read()
969
970 # if len(content) == 0:
971 # return True
972 # else:
973 # return False
974
975 command = 'sudo brctl stp ' + port_name + ' on'
976 self.logger.debug("command: " + command)
977 (_, stdout, _) = self.ssh_conn.exec_command(command)
978 content = stdout.read()
979
980 # if len(content) == 0:
981 # return True
982 # else:
983 # return False
984 command = 'sudo ip link set dev ' + port_name + ' up'
985 self.logger.debug("command: " + command)
986 (_, stdout, _) = self.ssh_conn.exec_command(command)
987 content = stdout.read()
988
989 if len(content) == 0:
990 return True
991 else:
992 return False
993 except paramiko.ssh_exception.SSHException as e:
994 self.logger.error("create_linux_bridge ssh Exception: " + str(e))
995 if "SSH session not active" in str(e):
996 self.ssh_connect()
997 return False
998
999 def set_mac_dhcp_server(self, ip, mac, vlan, netmask, dhcp_path):
1000 """
1001 Write into dhcp conf file a rule to assigned a fixed ip given to an specific MAC address
1002 :param ip: IP address asigned to a VM
1003 :param mac: VM vnic mac to be macthed with the IP received
1004 :param vlan: Segmentation id
1005 :param netmask: netmask value
1006 :param path: dhcp conf file path that live in namespace side
1007 :return: True if success
1008 """
1009
1010 if self.test:
1011 return True
1012
1013 net_namespace = 'ovim-' + str(vlan)
1014 dhcp_path = os.path.join(dhcp_path, net_namespace)
1015 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
1016
1017 if not ip:
1018 return False
1019 try:
1020 ip_data = mac.upper() + ',' + ip
1021
1022 command = 'sudo ip netns exec ' + net_namespace + ' touch ' + dhcp_hostsdir
1023 self.logger.debug("command: " + command)
1024 (_, stdout, _) = self.ssh_conn.exec_command(command)
1025 content = stdout.read()
1026
1027 command = 'sudo ip netns exec ' + net_namespace + ' sudo bash -ec "echo ' + ip_data + ' >> ' + dhcp_hostsdir + '"'
1028
1029 self.logger.debug("command: " + command)
1030 (_, stdout, _) = self.ssh_conn.exec_command(command)
1031 content = stdout.read()
1032
1033 if len(content) == 0:
1034 return True
1035 else:
1036 return False
1037 except paramiko.ssh_exception.SSHException as e:
1038 self.logger.error("set_mac_dhcp_server ssh Exception: " + str(e))
1039 if "SSH session not active" in str(e):
1040 self.ssh_connect()
1041 return False
1042
1043 def delete_mac_dhcp_server(self, ip, mac, vlan, dhcp_path):
1044 """
1045 Delete into dhcp conf file the ip assigned to a specific MAC address
1046
1047 :param ip: IP address asigned to a VM
1048 :param mac: VM vnic mac to be macthed with the IP received
1049 :param vlan: Segmentation id
1050 :param dhcp_path: dhcp conf file path that live in namespace side
1051 :return:
1052 """
1053
1054 if self.test:
1055 return False
1056 try:
1057 net_namespace = 'ovim-' + str(vlan)
1058 dhcp_path = os.path.join(dhcp_path, net_namespace)
1059 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
1060
1061 if not ip:
1062 return False
1063
1064 ip_data = mac.upper() + ',' + ip
1065
1066 command = 'sudo ip netns exec ' + net_namespace + ' sudo sed -i \'/' + ip_data + '/d\' ' + dhcp_hostsdir
1067 self.logger.debug("command: " + command)
1068 (_, stdout, _) = self.ssh_conn.exec_command(command)
1069 content = stdout.read()
1070
1071 if len(content) == 0:
1072 return True
1073 else:
1074 return False
1075
1076 except paramiko.ssh_exception.SSHException as e:
1077 self.logger.error("set_mac_dhcp_server ssh Exception: " + str(e))
1078 if "SSH session not active" in str(e):
1079 self.ssh_connect()
1080 return False
1081
1082 def launch_dhcp_server(self, vlan, ip_range, netmask, dhcp_path, gateway):
1083 """
1084 Generate a linux bridge and attache the port to a OVS bridge
1085 :param self:
1086 :param vlan: Segmentation id
1087 :param ip_range: IP dhcp range
1088 :param netmask: network netmask
1089 :param dhcp_path: dhcp conf file path that live in namespace side
1090 :param gateway: Gateway address for dhcp net
1091 :return: True if success
1092 """
1093
1094 if self.test:
1095 return True
1096 try:
1097 interface = 'tap-' + str(vlan)
1098 net_namespace = 'ovim-' + str(vlan)
1099 dhcp_path = os.path.join(dhcp_path, net_namespace)
1100 leases_path = os.path.join(dhcp_path, "dnsmasq.leases")
1101 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
1102
1103 dhcp_range = ip_range[0] + ',' + ip_range[1] + ',' + netmask
1104
1105 command = 'sudo ip netns exec ' + net_namespace + ' mkdir -p ' + dhcp_path
1106 self.logger.debug("command: " + command)
1107 (_, stdout, _) = self.ssh_conn.exec_command(command)
1108 content = stdout.read()
1109
1110 pid_path = os.path.join(dhcp_path, 'dnsmasq.pid')
1111 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_path
1112 self.logger.debug("command: " + command)
1113 (_, stdout, _) = self.ssh_conn.exec_command(command)
1114 content = stdout.read()
1115 # check if pid is runing
1116 pid_status_path = content
1117 if content:
1118 command = "ps aux | awk '{print $2 }' | grep " + pid_status_path
1119 self.logger.debug("command: " + command)
1120 (_, stdout, _) = self.ssh_conn.exec_command(command)
1121 content = stdout.read()
1122 if not content:
1123 command = 'sudo ip netns exec ' + net_namespace + ' /usr/sbin/dnsmasq --strict-order --except-interface=lo ' \
1124 '--interface=' + interface + ' --bind-interfaces --dhcp-hostsdir=' + dhcp_path + \
1125 ' --dhcp-range ' + dhcp_range + ' --pid-file=' + pid_file + ' --dhcp-leasefile=' + leases_path + \
1126 ' --listen-address ' + gateway
1127
1128 self.logger.debug("command: " + command)
1129 (_, stdout, _) = self.ssh_conn.exec_command(command)
1130 content = stdout.readline()
1131
1132 if len(content) == 0:
1133 return True
1134 else:
1135 return False
1136 except paramiko.ssh_exception.SSHException as e:
1137 self.logger.error("launch_dhcp_server ssh Exception: " + str(e))
1138 if "SSH session not active" in str(e):
1139 self.ssh_connect()
1140 return False
1141
1142 def delete_dhcp_interfaces(self, vlan):
1143 """
1144 Create a linux bridge with STP active
1145 :param vlan: netowrk vlan id
1146 :return:
1147 """
1148
1149 if self.test:
1150 return True
1151 try:
1152 net_namespace = 'ovim-' + str(vlan)
1153 command = 'sudo ovs-vsctl del-port br-int ovs-tap-' + str(vlan)
1154 self.logger.debug("command: " + command)
1155 (_, stdout, _) = self.ssh_conn.exec_command(command)
1156 content = stdout.read()
1157
1158 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + str(vlan) + ' down'
1159 self.logger.debug("command: " + command)
1160 (_, stdout, _) = self.ssh_conn.exec_command(command)
1161 content = stdout.read()
1162
1163 command = 'sudo ip link set dev ovs-tap-' + str(vlan) + ' down'
1164 self.logger.debug("command: " + command)
1165 (_, stdout, _) = self.ssh_conn.exec_command(command)
1166 content = stdout.read()
1167 except paramiko.ssh_exception.SSHException as e:
1168 self.logger.error("delete_dhcp_interfaces ssh Exception: " + str(e))
1169 if "SSH session not active" in str(e):
1170 self.ssh_connect()
1171 return False
1172
1173 def create_dhcp_interfaces(self, vlan, ip_listen_address, netmask):
1174 """
1175 Create a linux bridge with STP active
1176 :param vlan: segmentation id
1177 :param ip_listen_address: Listen Ip address for the dhcp service, the tap interface living in namesapce side
1178 :param netmask: dhcp net CIDR
1179 :return: True if success
1180 """
1181
1182 if self.test:
1183 return True
1184 try:
1185 net_namespace = 'ovim-' + str(vlan)
1186 namespace_interface = 'tap-' + str(vlan)
1187
1188 command = 'sudo ip netns add ' + net_namespace
1189 self.logger.debug("command: " + command)
1190 (_, stdout, _) = self.ssh_conn.exec_command(command)
1191 content = stdout.read()
1192
1193 command = 'sudo ip link add tap-' + str(vlan) + ' type veth peer name ovs-tap-' + str(vlan)
1194 self.logger.debug("command: " + command)
1195 (_, stdout, _) = self.ssh_conn.exec_command(command)
1196 content = stdout.read()
1197
1198 command = 'sudo ovs-vsctl add-port br-int ovs-tap-' + str(vlan) + ' tag=' + str(vlan)
1199 self.logger.debug("command: " + command)
1200 (_, stdout, _) = self.ssh_conn.exec_command(command)
1201 content = stdout.read()
1202
1203 command = 'sudo ip link set tap-' + str(vlan) + ' netns ' + net_namespace
1204 self.logger.debug("command: " + command)
1205 (_, stdout, _) = self.ssh_conn.exec_command(command)
1206 content = stdout.read()
1207
1208 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + str(vlan) + ' up'
1209 self.logger.debug("command: " + command)
1210 (_, stdout, _) = self.ssh_conn.exec_command(command)
1211 content = stdout.read()
1212
1213 command = 'sudo ip link set dev ovs-tap-' + str(vlan) + ' up'
1214 self.logger.debug("command: " + command)
1215 (_, stdout, _) = self.ssh_conn.exec_command(command)
1216 content = stdout.read()
1217
1218 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev lo up'
1219 self.logger.debug("command: " + command)
1220 (_, stdout, _) = self.ssh_conn.exec_command(command)
1221 content = stdout.read()
1222
1223 command = 'sudo ip netns exec ' + net_namespace + ' ' + ' ifconfig ' + namespace_interface \
1224 + ' ' + ip_listen_address + ' netmask ' + netmask
1225 self.logger.debug("command: " + command)
1226 (_, stdout, _) = self.ssh_conn.exec_command(command)
1227 content = stdout.read()
1228
1229 if len(content) == 0:
1230 return True
1231 else:
1232 return False
1233 except paramiko.ssh_exception.SSHException as e:
1234 self.logger.error("create_dhcp_interfaces ssh Exception: " + str(e))
1235 if "SSH session not active" in str(e):
1236 self.ssh_connect()
1237 return False
1238
1239
1240 def create_ovs_vxlan_tunnel(self, vxlan_interface, remote_ip):
1241 """
1242 Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level
1243 :param vxlan_interface: vlxan inteface name.
1244 :param remote_ip: tunnel endpoint remote compute ip.
1245 :return:
1246 """
1247 if self.test:
1248 return True
1249 try:
1250 command = 'sudo ovs-vsctl add-port br-int ' + vxlan_interface + \
1251 ' -- set Interface ' + vxlan_interface + ' type=vxlan options:remote_ip=' + remote_ip + \
1252 ' -- set Port ' + vxlan_interface + ' other_config:stp-path-cost=10'
1253 self.logger.debug("command: " + command)
1254 (_, stdout, _) = self.ssh_conn.exec_command(command)
1255 content = stdout.read()
1256 # print content
1257 if len(content) == 0:
1258 return True
1259 else:
1260 return False
1261 except paramiko.ssh_exception.SSHException as e:
1262 self.logger.error("create_ovs_vxlan_tunnel ssh Exception: " + str(e))
1263 if "SSH session not active" in str(e):
1264 self.ssh_connect()
1265 return False
1266
1267 def delete_ovs_vxlan_tunnel(self, vxlan_interface):
1268 """
1269 Delete a vlxan tunnel port from a OVS brdige.
1270 :param vxlan_interface: vlxan name to be delete it.
1271 :return: True if success.
1272 """
1273 if self.test:
1274 return True
1275 try:
1276 command = 'sudo ovs-vsctl del-port br-int ' + vxlan_interface
1277 self.logger.debug("command: " + command)
1278 (_, stdout, _) = self.ssh_conn.exec_command(command)
1279 content = stdout.read()
1280 # print content
1281 if len(content) == 0:
1282 return True
1283 else:
1284 return False
1285 except paramiko.ssh_exception.SSHException as e:
1286 self.logger.error("delete_ovs_vxlan_tunnel ssh Exception: " + str(e))
1287 if "SSH session not active" in str(e):
1288 self.ssh_connect()
1289 return False
1290
1291 def delete_ovs_bridge(self):
1292 """
1293 Delete a OVS bridge from a compute.
1294 :return: True if success
1295 """
1296 if self.test:
1297 return True
1298 try:
1299 command = 'sudo ovs-vsctl del-br br-int'
1300 self.logger.debug("command: " + command)
1301 (_, stdout, _) = self.ssh_conn.exec_command(command)
1302 content = stdout.read()
1303 if len(content) == 0:
1304 return True
1305 else:
1306 return False
1307 except paramiko.ssh_exception.SSHException as e:
1308 self.logger.error("delete_ovs_bridge ssh Exception: " + str(e))
1309 if "SSH session not active" in str(e):
1310 self.ssh_connect()
1311 return False
1312
1313 def get_file_info(self, path):
1314 command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
1315 self.logger.debug("command: " + command)
1316 (_, stdout, _) = self.ssh_conn.exec_command(command)
1317 content = stdout.read()
1318 if len(content) == 0:
1319 return None # file does not exist
1320 else:
1321 return content.split(" ") # (permission, 1, owner, group, size, date, file)
1322
1323 def qemu_get_info(self, path):
1324 command = 'qemu-img info ' + path
1325 self.logger.debug("command: " + command)
1326 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
1327 content = stdout.read()
1328 if len(content) == 0:
1329 error = stderr.read()
1330 self.logger.error("get_qemu_info error " + error)
1331 raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error)
1332 else:
1333 try:
1334 return yaml.load(content)
1335 except yaml.YAMLError as exc:
1336 text = ""
1337 if hasattr(exc, 'problem_mark'):
1338 mark = exc.problem_mark
1339 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
1340 self.logger.error("get_qemu_info yaml format Exception " + text)
1341 raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text)
1342
1343 def qemu_change_backing(self, inc_file, new_backing_file):
1344 command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file
1345 self.logger.debug("command: " + command)
1346 (_, _, stderr) = self.ssh_conn.exec_command(command)
1347 content = stderr.read()
1348 if len(content) == 0:
1349 return 0
1350 else:
1351 self.logger.error("qemu_change_backing error: " + content)
1352 return -1
1353
1354 def get_notused_filename(self, proposed_name, suffix=''):
1355 '''Look for a non existing file_name in the host
1356 proposed_name: proposed file name, includes path
1357 suffix: suffix to be added to the name, before the extention
1358 '''
1359 extension = proposed_name.rfind(".")
1360 slash = proposed_name.rfind("/")
1361 if extension < 0 or extension < slash: # no extension
1362 extension = len(proposed_name)
1363 target_name = proposed_name[:extension] + suffix + proposed_name[extension:]
1364 info = self.get_file_info(target_name)
1365 if info is None:
1366 return target_name
1367
1368 index=0
1369 while info is not None:
1370 target_name = proposed_name[:extension] + suffix + "-" + str(index) + proposed_name[extension:]
1371 index+=1
1372 info = self.get_file_info(target_name)
1373 return target_name
1374
1375 def get_notused_path(self, proposed_path, suffix=''):
1376 '''Look for a non existing path at database for images
1377 proposed_path: proposed file name, includes path
1378 suffix: suffix to be added to the name, before the extention
1379 '''
1380 extension = proposed_path.rfind(".")
1381 if extension < 0:
1382 extension = len(proposed_path)
1383 if suffix != None:
1384 target_path = proposed_path[:extension] + suffix + proposed_path[extension:]
1385 index=0
1386 while True:
1387 r,_=self.db.get_table(FROM="images",WHERE={"path":target_path})
1388 if r<=0:
1389 return target_path
1390 target_path = proposed_path[:extension] + suffix + "-" + str(index) + proposed_path[extension:]
1391 index+=1
1392
1393
1394 def delete_file(self, file_name):
1395 command = 'rm -f '+file_name
1396 self.logger.debug("command: " + command)
1397 (_, _, stderr) = self.ssh_conn.exec_command(command)
1398 error_msg = stderr.read()
1399 if len(error_msg) > 0:
1400 raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg)
1401
1402 def copy_file(self, source, destination, perserve_time=True):
1403 if source[0:4]=="http":
1404 command = "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
1405 dst=destination, src=source, dst_result=destination + ".result" )
1406 else:
1407 command = 'cp --no-preserve=mode'
1408 if perserve_time:
1409 command += ' --preserve=timestamps'
1410 command += " '{}' '{}'".format(source, destination)
1411 self.logger.debug("command: " + command)
1412 (_, _, stderr) = self.ssh_conn.exec_command(command)
1413 error_msg = stderr.read()
1414 if len(error_msg) > 0:
1415 raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg)
1416
1417 def copy_remote_file(self, remote_file, use_incremental):
1418 ''' Copy a file from the repository to local folder and recursively
1419 copy the backing files in case the remote file is incremental
1420 Read and/or modified self.localinfo['files'] that contain the
1421 unmodified copies of images in the local path
1422 params:
1423 remote_file: path of remote file
1424 use_incremental: None (leave the decision to this function), True, False
1425 return:
1426 local_file: name of local file
1427 qemu_info: dict with quemu information of local file
1428 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
1429 '''
1430
1431 use_incremental_out = use_incremental
1432 new_backing_file = None
1433 local_file = None
1434 file_from_local = True
1435
1436 #in case incremental use is not decided, take the decision depending on the image
1437 #avoid the use of incremental if this image is already incremental
1438 if remote_file[0:4] == "http":
1439 file_from_local = False
1440 if file_from_local:
1441 qemu_remote_info = self.qemu_get_info(remote_file)
1442 if use_incremental_out==None:
1443 use_incremental_out = not ( file_from_local and 'backing file' in qemu_remote_info)
1444 #copy recursivelly the backing files
1445 if file_from_local and 'backing file' in qemu_remote_info:
1446 new_backing_file, _, _ = self.copy_remote_file(qemu_remote_info['backing file'], True)
1447
1448 #check if remote file is present locally
1449 if use_incremental_out and remote_file in self.localinfo['files']:
1450 local_file = self.localinfo['files'][remote_file]
1451 local_file_info = self.get_file_info(local_file)
1452 if file_from_local:
1453 remote_file_info = self.get_file_info(remote_file)
1454 if local_file_info == None:
1455 local_file = None
1456 elif file_from_local and (local_file_info[4]!=remote_file_info[4] or local_file_info[5]!=remote_file_info[5]):
1457 #local copy of file not valid because date or size are different.
1458 #TODO DELETE local file if this file is not used by any active virtual machine
1459 try:
1460 self.delete_file(local_file)
1461 del self.localinfo['files'][remote_file]
1462 except Exception:
1463 pass
1464 local_file = None
1465 else: #check that the local file has the same backing file, or there are not backing at all
1466 qemu_info = self.qemu_get_info(local_file)
1467 if new_backing_file != qemu_info.get('backing file'):
1468 local_file = None
1469
1470
1471 if local_file == None: #copy the file
1472 img_name= remote_file.split('/') [-1]
1473 img_local = self.image_path + '/' + img_name
1474 local_file = self.get_notused_filename(img_local)
1475 self.copy_file(remote_file, local_file, use_incremental_out)
1476
1477 if use_incremental_out:
1478 self.localinfo['files'][remote_file] = local_file
1479 if new_backing_file:
1480 self.qemu_change_backing(local_file, new_backing_file)
1481 qemu_info = self.qemu_get_info(local_file)
1482
1483 return local_file, qemu_info, use_incremental_out
1484
1485 def launch_server(self, conn, server, rebuild=False, domain=None):
1486 if self.test:
1487 time.sleep(random.randint(20,150)) #sleep random timeto be make it a bit more real
1488 return 0, 'Success'
1489
1490 server_id = server['uuid']
1491 paused = server.get('paused','no')
1492 try:
1493 if domain!=None and rebuild==False:
1494 domain.resume()
1495 #self.server_status[server_id] = 'ACTIVE'
1496 return 0, 'Success'
1497
1498 self.db_lock.acquire()
1499 result, server_data = self.db.get_instance(server_id)
1500 self.db_lock.release()
1501 if result <= 0:
1502 self.logger.error("launch_server ERROR getting server from DB %d %s", result, server_data)
1503 return result, server_data
1504
1505 #0: get image metadata
1506 server_metadata = server.get('metadata', {})
1507 use_incremental = None
1508
1509 if "use_incremental" in server_metadata:
1510 use_incremental = False if server_metadata["use_incremental"] == "no" else True
1511
1512 server_host_files = self.localinfo['server_files'].get( server['uuid'], {})
1513 if rebuild:
1514 #delete previous incremental files
1515 for file_ in server_host_files.values():
1516 self.delete_file(file_['source file'] )
1517 server_host_files={}
1518
1519 #1: obtain aditional devices (disks)
1520 #Put as first device the main disk
1521 devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ]
1522 if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']:
1523 devices += server_data['extended']['devices']
1524
1525 for dev in devices:
1526 if dev['image_id'] == None:
1527 continue
1528
1529 self.db_lock.acquire()
1530 result, content = self.db.get_table(FROM='images', SELECT=('path', 'metadata'),
1531 WHERE={'uuid': dev['image_id']})
1532 self.db_lock.release()
1533 if result <= 0:
1534 error_text = "ERROR", result, content, "when getting image", dev['image_id']
1535 self.logger.error("launch_server " + error_text)
1536 return -1, error_text
1537 if content[0]['metadata'] is not None:
1538 dev['metadata'] = json.loads(content[0]['metadata'])
1539 else:
1540 dev['metadata'] = {}
1541
1542 if dev['image_id'] in server_host_files:
1543 dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path
1544 dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2
1545 continue
1546
1547 #2: copy image to host
1548 remote_file = content[0]['path']
1549 use_incremental_image = use_incremental
1550 if dev['metadata'].get("use_incremental") == "no":
1551 use_incremental_image = False
1552 local_file, qemu_info, use_incremental_image = self.copy_remote_file(remote_file, use_incremental_image)
1553
1554 #create incremental image
1555 if use_incremental_image:
1556 local_file_inc = self.get_notused_filename(local_file, '.inc')
1557 command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file
1558 self.logger.debug("command: " + command)
1559 (_, _, stderr) = self.ssh_conn.exec_command(command)
1560 error_msg = stderr.read()
1561 if len(error_msg) > 0:
1562 raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg)
1563 local_file = local_file_inc
1564 qemu_info = {'file format':'qcow2'}
1565
1566 server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']}
1567
1568 dev['source file'] = local_file
1569 dev['file format'] = qemu_info['file format']
1570
1571 self.localinfo['server_files'][ server['uuid'] ] = server_host_files
1572 self.localinfo_dirty = True
1573
1574 #3 Create XML
1575 result, xml = self.create_xml_server(server_data, devices, server_metadata) #local_file
1576 if result <0:
1577 self.logger.error("create xml server error: " + xml)
1578 return -2, xml
1579 self.logger.debug("create xml: " + xml)
1580 atribute = host_thread.lvirt_module.VIR_DOMAIN_START_PAUSED if paused == "yes" else 0
1581 #4 Start the domain
1582 if not rebuild: #ensures that any pending destroying server is done
1583 self.server_forceoff(True)
1584 #self.logger.debug("launching instance " + xml)
1585 conn.createXML(xml, atribute)
1586 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
1587
1588 return 0, 'Success'
1589
1590 except paramiko.ssh_exception.SSHException as e:
1591 text = e.args[0]
1592 self.logger.error("launch_server id='%s' ssh Exception: %s", server_id, text)
1593 if "SSH session not active" in text:
1594 self.ssh_connect()
1595 except host_thread.lvirt_module.libvirtError as e:
1596 text = e.get_error_message()
1597 self.logger.error("launch_server id='%s' libvirt Exception: %s", server_id, text)
1598 except Exception as e:
1599 text = str(e)
1600 self.logger.error("launch_server id='%s' Exception: %s", server_id, text)
1601 return -1, text
1602
1603 def update_servers_status(self):
1604 # # virDomainState
1605 # VIR_DOMAIN_NOSTATE = 0
1606 # VIR_DOMAIN_RUNNING = 1
1607 # VIR_DOMAIN_BLOCKED = 2
1608 # VIR_DOMAIN_PAUSED = 3
1609 # VIR_DOMAIN_SHUTDOWN = 4
1610 # VIR_DOMAIN_SHUTOFF = 5
1611 # VIR_DOMAIN_CRASHED = 6
1612 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
1613
1614 if self.test or len(self.server_status)==0:
1615 return
1616
1617 try:
1618 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1619 domains= conn.listAllDomains()
1620 domain_dict={}
1621 for domain in domains:
1622 uuid = domain.UUIDString() ;
1623 libvirt_status = domain.state()
1624 #print libvirt_status
1625 if libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_RUNNING or libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTDOWN:
1626 new_status = "ACTIVE"
1627 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_PAUSED:
1628 new_status = "PAUSED"
1629 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTOFF:
1630 new_status = "INACTIVE"
1631 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_CRASHED:
1632 new_status = "ERROR"
1633 else:
1634 new_status = None
1635 domain_dict[uuid] = new_status
1636 conn.close()
1637 except host_thread.lvirt_module.libvirtError as e:
1638 self.logger.error("get_state() Exception " + e.get_error_message())
1639 return
1640
1641 for server_id, current_status in self.server_status.iteritems():
1642 new_status = None
1643 if server_id in domain_dict:
1644 new_status = domain_dict[server_id]
1645 else:
1646 new_status = "INACTIVE"
1647
1648 if new_status == None or new_status == current_status:
1649 continue
1650 if new_status == 'INACTIVE' and current_status == 'ERROR':
1651 continue #keep ERROR status, because obviously this machine is not running
1652 #change status
1653 self.logger.debug("server id='%s' status change from '%s' to '%s'", server_id, current_status, new_status)
1654 STATUS={'progress':100, 'status':new_status}
1655 if new_status == 'ERROR':
1656 STATUS['last_error'] = 'machine has crashed'
1657 self.db_lock.acquire()
1658 r,_ = self.db.update_rows('instances', STATUS, {'uuid':server_id}, log=False)
1659 self.db_lock.release()
1660 if r>=0:
1661 self.server_status[server_id] = new_status
1662
1663 def action_on_server(self, req, last_retry=True):
1664 '''Perform an action on a req
1665 Attributes:
1666 req: dictionary that contain:
1667 server properties: 'uuid','name','tenant_id','status'
1668 action: 'action'
1669 host properties: 'user', 'ip_name'
1670 return (error, text)
1671 0: No error. VM is updated to new state,
1672 -1: Invalid action, as trying to pause a PAUSED VM
1673 -2: Error accessing host
1674 -3: VM nor present
1675 -4: Error at DB access
1676 -5: Error while trying to perform action. VM is updated to ERROR
1677 '''
1678 server_id = req['uuid']
1679 conn = None
1680 new_status = None
1681 old_status = req['status']
1682 last_error = None
1683
1684 if self.test:
1685 if 'terminate' in req['action']:
1686 new_status = 'deleted'
1687 elif 'shutoff' in req['action'] or 'shutdown' in req['action'] or 'forceOff' in req['action']:
1688 if req['status']!='ERROR':
1689 time.sleep(5)
1690 new_status = 'INACTIVE'
1691 elif 'start' in req['action'] and req['status']!='ERROR':
1692 new_status = 'ACTIVE'
1693 elif 'resume' in req['action'] and req['status']!='ERROR' and req['status']!='INACTIVE':
1694 new_status = 'ACTIVE'
1695 elif 'pause' in req['action'] and req['status']!='ERROR':
1696 new_status = 'PAUSED'
1697 elif 'reboot' in req['action'] and req['status']!='ERROR':
1698 new_status = 'ACTIVE'
1699 elif 'rebuild' in req['action']:
1700 time.sleep(random.randint(20,150))
1701 new_status = 'ACTIVE'
1702 elif 'createImage' in req['action']:
1703 time.sleep(5)
1704 self.create_image(None, req)
1705 else:
1706 try:
1707 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1708 try:
1709 dom = conn.lookupByUUIDString(server_id)
1710 except host_thread.lvirt_module.libvirtError as e:
1711 text = e.get_error_message()
1712 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1713 dom = None
1714 else:
1715 self.logger.error("action_on_server id='%s' libvirt exception: %s", server_id, text)
1716 raise e
1717
1718 if 'forceOff' in req['action']:
1719 if dom == None:
1720 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1721 else:
1722 try:
1723 self.logger.debug("sending DESTROY to server id='%s'", server_id)
1724 dom.destroy()
1725 except Exception as e:
1726 if "domain is not running" not in e.get_error_message():
1727 self.logger.error("action_on_server id='%s' Exception while sending force off: %s",
1728 server_id, e.get_error_message())
1729 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1730 new_status = 'ERROR'
1731
1732 elif 'terminate' in req['action']:
1733 if dom == None:
1734 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1735 new_status = 'deleted'
1736 else:
1737 try:
1738 if req['action']['terminate'] == 'force':
1739 self.logger.debug("sending DESTROY to server id='%s'", server_id)
1740 dom.destroy()
1741 new_status = 'deleted'
1742 else:
1743 self.logger.debug("sending SHUTDOWN to server id='%s'", server_id)
1744 dom.shutdown()
1745 self.pending_terminate_server.append( (time.time()+10,server_id) )
1746 except Exception as e:
1747 self.logger.error("action_on_server id='%s' Exception while destroy: %s",
1748 server_id, e.get_error_message())
1749 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1750 new_status = 'ERROR'
1751 if "domain is not running" in e.get_error_message():
1752 try:
1753 dom.undefine()
1754 new_status = 'deleted'
1755 except Exception:
1756 self.logger.error("action_on_server id='%s' Exception while undefine: %s",
1757 server_id, e.get_error_message())
1758 last_error = 'action_on_server Exception2 while undefine:', e.get_error_message()
1759 #Exception: 'virDomainDetachDevice() failed'
1760 if new_status=='deleted':
1761 if server_id in self.server_status:
1762 del self.server_status[server_id]
1763 if req['uuid'] in self.localinfo['server_files']:
1764 for file_ in self.localinfo['server_files'][ req['uuid'] ].values():
1765 try:
1766 self.delete_file(file_['source file'])
1767 except Exception:
1768 pass
1769 del self.localinfo['server_files'][ req['uuid'] ]
1770 self.localinfo_dirty = True
1771
1772 elif 'shutoff' in req['action'] or 'shutdown' in req['action']:
1773 try:
1774 if dom == None:
1775 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1776 else:
1777 dom.shutdown()
1778 # new_status = 'INACTIVE'
1779 #TODO: check status for changing at database
1780 except Exception as e:
1781 new_status = 'ERROR'
1782 self.logger.error("action_on_server id='%s' Exception while shutdown: %s",
1783 server_id, e.get_error_message())
1784 last_error = 'action_on_server Exception while shutdown: ' + e.get_error_message()
1785
1786 elif 'rebuild' in req['action']:
1787 if dom != None:
1788 dom.destroy()
1789 r = self.launch_server(conn, req, True, None)
1790 if r[0] <0:
1791 new_status = 'ERROR'
1792 last_error = r[1]
1793 else:
1794 new_status = 'ACTIVE'
1795 elif 'start' in req['action']:
1796 # The instance is only create in DB but not yet at libvirt domain, needs to be create
1797 rebuild = True if req['action']['start'] == 'rebuild' else False
1798 r = self.launch_server(conn, req, rebuild, dom)
1799 if r[0] <0:
1800 new_status = 'ERROR'
1801 last_error = r[1]
1802 else:
1803 new_status = 'ACTIVE'
1804
1805 elif 'resume' in req['action']:
1806 try:
1807 if dom == None:
1808 pass
1809 else:
1810 dom.resume()
1811 # new_status = 'ACTIVE'
1812 except Exception as e:
1813 self.logger.error("action_on_server id='%s' Exception while resume: %s",
1814 server_id, e.get_error_message())
1815
1816 elif 'pause' in req['action']:
1817 try:
1818 if dom == None:
1819 pass
1820 else:
1821 dom.suspend()
1822 # new_status = 'PAUSED'
1823 except Exception as e:
1824 self.logger.error("action_on_server id='%s' Exception while pause: %s",
1825 server_id, e.get_error_message())
1826
1827 elif 'reboot' in req['action']:
1828 try:
1829 if dom == None:
1830 pass
1831 else:
1832 dom.reboot()
1833 self.logger.debug("action_on_server id='%s' reboot:", server_id)
1834 #new_status = 'ACTIVE'
1835 except Exception as e:
1836 self.logger.error("action_on_server id='%s' Exception while reboot: %s",
1837 server_id, e.get_error_message())
1838 elif 'createImage' in req['action']:
1839 self.create_image(dom, req)
1840
1841
1842 conn.close()
1843 except host_thread.lvirt_module.libvirtError as e:
1844 if conn is not None: conn.close()
1845 text = e.get_error_message()
1846 new_status = "ERROR"
1847 last_error = text
1848 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1849 self.logger.debug("action_on_server id='%s' Exception removed from host", server_id)
1850 else:
1851 self.logger.error("action_on_server id='%s' Exception %s", server_id, text)
1852 #end of if self.test
1853 if new_status == None:
1854 return 1
1855
1856 self.logger.debug("action_on_server id='%s' new status=%s %s",server_id, new_status, last_error)
1857 UPDATE = {'progress':100, 'status':new_status}
1858
1859 if new_status=='ERROR':
1860 if not last_retry: #if there will be another retry do not update database
1861 return -1
1862 elif 'terminate' in req['action']:
1863 #PUT a log in the database
1864 self.logger.error("PANIC deleting server id='%s' %s", server_id, last_error)
1865 self.db_lock.acquire()
1866 self.db.new_row('logs',
1867 {'uuid':server_id, 'tenant_id':req['tenant_id'], 'related':'instances','level':'panic',
1868 'description':'PANIC deleting server from host '+self.name+': '+last_error}
1869 )
1870 self.db_lock.release()
1871 if server_id in self.server_status:
1872 del self.server_status[server_id]
1873 return -1
1874 else:
1875 UPDATE['last_error'] = last_error
1876 if new_status != 'deleted' and (new_status != old_status or new_status == 'ERROR') :
1877 self.db_lock.acquire()
1878 self.db.update_rows('instances', UPDATE, {'uuid':server_id}, log=True)
1879 self.server_status[server_id] = new_status
1880 self.db_lock.release()
1881 if new_status == 'ERROR':
1882 return -1
1883 return 1
1884
1885
1886 def restore_iface(self, name, mac, lib_conn=None):
1887 ''' make an ifdown, ifup to restore default parameter of na interface
1888 Params:
1889 mac: mac address of the interface
1890 lib_conn: connection to the libvirt, if None a new connection is created
1891 Return 0,None if ok, -1,text if fails
1892 '''
1893 conn=None
1894 ret = 0
1895 error_text=None
1896 if self.test:
1897 self.logger.debug("restore_iface '%s' %s", name, mac)
1898 return 0, None
1899 try:
1900 if not lib_conn:
1901 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1902 else:
1903 conn = lib_conn
1904
1905 #wait to the pending VM deletion
1906 #TODO.Revise self.server_forceoff(True)
1907
1908 iface = conn.interfaceLookupByMACString(mac)
1909 if iface.isActive():
1910 iface.destroy()
1911 iface.create()
1912 self.logger.debug("restore_iface '%s' %s", name, mac)
1913 except host_thread.lvirt_module.libvirtError as e:
1914 error_text = e.get_error_message()
1915 self.logger.error("restore_iface '%s' '%s' libvirt exception: %s", name, mac, error_text)
1916 ret=-1
1917 finally:
1918 if lib_conn is None and conn is not None:
1919 conn.close()
1920 return ret, error_text
1921
1922
1923 def create_image(self,dom, req):
1924 if self.test:
1925 if 'path' in req['action']['createImage']:
1926 file_dst = req['action']['createImage']['path']
1927 else:
1928 createImage=req['action']['createImage']
1929 img_name= createImage['source']['path']
1930 index=img_name.rfind('/')
1931 file_dst = self.get_notused_path(img_name[:index+1] + createImage['name'] + '.qcow2')
1932 image_status='ACTIVE'
1933 else:
1934 for retry in (0,1):
1935 try:
1936 server_id = req['uuid']
1937 createImage=req['action']['createImage']
1938 file_orig = self.localinfo['server_files'][server_id] [ createImage['source']['image_id'] ] ['source file']
1939 if 'path' in req['action']['createImage']:
1940 file_dst = req['action']['createImage']['path']
1941 else:
1942 img_name= createImage['source']['path']
1943 index=img_name.rfind('/')
1944 file_dst = self.get_notused_filename(img_name[:index+1] + createImage['name'] + '.qcow2')
1945
1946 self.copy_file(file_orig, file_dst)
1947 qemu_info = self.qemu_get_info(file_orig)
1948 if 'backing file' in qemu_info:
1949 for k,v in self.localinfo['files'].items():
1950 if v==qemu_info['backing file']:
1951 self.qemu_change_backing(file_dst, k)
1952 break
1953 image_status='ACTIVE'
1954 break
1955 except paramiko.ssh_exception.SSHException as e:
1956 image_status='ERROR'
1957 error_text = e.args[0]
1958 self.logger.error("create_image id='%s' ssh Exception: %s", server_id, error_text)
1959 if "SSH session not active" in error_text and retry==0:
1960 self.ssh_connect()
1961 except Exception as e:
1962 image_status='ERROR'
1963 error_text = str(e)
1964 self.logger.error("create_image id='%s' Exception: %s", server_id, error_text)
1965
1966 #TODO insert a last_error at database
1967 self.db_lock.acquire()
1968 self.db.update_rows('images', {'status':image_status, 'progress': 100, 'path':file_dst},
1969 {'uuid':req['new_image']['uuid']}, log=True)
1970 self.db_lock.release()
1971
1972 def edit_iface(self, port_id, old_net, new_net):
1973 #This action imply remove and insert interface to put proper parameters
1974 if self.test:
1975 time.sleep(1)
1976 else:
1977 #get iface details
1978 self.db_lock.acquire()
1979 r,c = self.db.get_table(FROM='ports as p join resources_port as rp on p.uuid=rp.port_id',
1980 WHERE={'port_id': port_id})
1981 self.db_lock.release()
1982 if r<0:
1983 self.logger.error("edit_iface %s DDBB error: %s", port_id, c)
1984 return
1985 elif r==0:
1986 self.logger.error("edit_iface %s port not found", port_id)
1987 return
1988 port=c[0]
1989 if port["model"]!="VF":
1990 self.logger.error("edit_iface %s ERROR model must be VF", port_id)
1991 return
1992 #create xml detach file
1993 xml=[]
1994 self.xml_level = 2
1995 xml.append("<interface type='hostdev' managed='yes'>")
1996 xml.append(" <mac address='" +port['mac']+ "'/>")
1997 xml.append(" <source>"+ self.pci2xml(port['pci'])+"\n </source>")
1998 xml.append('</interface>')
1999
2000
2001 try:
2002 conn=None
2003 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
2004 dom = conn.lookupByUUIDString(port["instance_id"])
2005 if old_net:
2006 text="\n".join(xml)
2007 self.logger.debug("edit_iface detaching SRIOV interface " + text)
2008 dom.detachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
2009 if new_net:
2010 xml[-1] =" <vlan> <tag id='" + str(port['vlan']) + "'/> </vlan>"
2011 self.xml_level = 1
2012 xml.append(self.pci2xml(port.get('vpci',None)) )
2013 xml.append('</interface>')
2014 text="\n".join(xml)
2015 self.logger.debug("edit_iface attaching SRIOV interface " + text)
2016 dom.attachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
2017
2018 except host_thread.lvirt_module.libvirtError as e:
2019 text = e.get_error_message()
2020 self.logger.error("edit_iface %s libvirt exception: %s", port["instance_id"], text)
2021
2022 finally:
2023 if conn is not None: conn.close()
2024
2025
2026 def create_server(server, db, db_lock, only_of_ports):
2027 extended = server.get('extended', None)
2028 requirements={}
2029 requirements['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
2030 requirements['ram'] = server['flavor'].get('ram', 0)
2031 if requirements['ram']== None:
2032 requirements['ram'] = 0
2033 requirements['vcpus'] = server['flavor'].get('vcpus', 0)
2034 if requirements['vcpus']== None:
2035 requirements['vcpus'] = 0
2036 #If extended is not defined get requirements from flavor
2037 if extended is None:
2038 #If extended is defined in flavor convert to dictionary and use it
2039 if 'extended' in server['flavor'] and server['flavor']['extended'] != None:
2040 json_acceptable_string = server['flavor']['extended'].replace("'", "\"")
2041 extended = json.loads(json_acceptable_string)
2042 else:
2043 extended = None
2044 #print json.dumps(extended, indent=4)
2045
2046 #For simplicity only one numa VM are supported in the initial implementation
2047 if extended != None:
2048 numas = extended.get('numas', [])
2049 if len(numas)>1:
2050 return (-2, "Multi-NUMA VMs are not supported yet")
2051 #elif len(numas)<1:
2052 # return (-1, "At least one numa must be specified")
2053
2054 #a for loop is used in order to be ready to multi-NUMA VMs
2055 request = []
2056 for numa in numas:
2057 numa_req = {}
2058 numa_req['memory'] = numa.get('memory', 0)
2059 if 'cores' in numa:
2060 numa_req['proc_req_nb'] = numa['cores'] #number of cores or threads to be reserved
2061 numa_req['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
2062 numa_req['proc_req_list'] = numa.get('cores-id', None) #list of ids to be assigned to the cores or threads
2063 elif 'paired-threads' in numa:
2064 numa_req['proc_req_nb'] = numa['paired-threads']
2065 numa_req['proc_req_type'] = 'paired-threads'
2066 numa_req['proc_req_list'] = numa.get('paired-threads-id', None)
2067 elif 'threads' in numa:
2068 numa_req['proc_req_nb'] = numa['threads']
2069 numa_req['proc_req_type'] = 'threads'
2070 numa_req['proc_req_list'] = numa.get('threads-id', None)
2071 else:
2072 numa_req['proc_req_nb'] = 0 # by default
2073 numa_req['proc_req_type'] = 'threads'
2074
2075
2076
2077 #Generate a list of sriov and another for physical interfaces
2078 interfaces = numa.get('interfaces', [])
2079 sriov_list = []
2080 port_list = []
2081 for iface in interfaces:
2082 iface['bandwidth'] = int(iface['bandwidth'])
2083 if iface['dedicated'][:3]=='yes':
2084 port_list.append(iface)
2085 else:
2086 sriov_list.append(iface)
2087
2088 #Save lists ordered from more restrictive to less bw requirements
2089 numa_req['sriov_list'] = sorted(sriov_list, key=lambda k: k['bandwidth'], reverse=True)
2090 numa_req['port_list'] = sorted(port_list, key=lambda k: k['bandwidth'], reverse=True)
2091
2092
2093 request.append(numa_req)
2094
2095 # print "----------\n"+json.dumps(request[0], indent=4)
2096 # print '----------\n\n'
2097
2098 #Search in db for an appropriate numa for each requested numa
2099 #at the moment multi-NUMA VMs are not supported
2100 if len(request)>0:
2101 requirements['numa'].update(request[0])
2102 if requirements['numa']['memory']>0:
2103 requirements['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
2104 elif requirements['ram']==0:
2105 return (-1, "Memory information not set neither at extended field not at ram")
2106 if requirements['numa']['proc_req_nb']>0:
2107 requirements['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
2108 elif requirements['vcpus']==0:
2109 return (-1, "Processor information not set neither at extended field not at vcpus")
2110
2111
2112 db_lock.acquire()
2113 result, content = db.get_numas(requirements, server.get('host_id', None), only_of_ports)
2114 db_lock.release()
2115
2116 if result == -1:
2117 return (-1, content)
2118
2119 numa_id = content['numa_id']
2120 host_id = content['host_id']
2121
2122 #obtain threads_id and calculate pinning
2123 cpu_pinning = []
2124 reserved_threads=[]
2125 if requirements['numa']['proc_req_nb']>0:
2126 db_lock.acquire()
2127 result, content = db.get_table(FROM='resources_core',
2128 SELECT=('id','core_id','thread_id'),
2129 WHERE={'numa_id':numa_id,'instance_id': None, 'status':'ok'} )
2130 db_lock.release()
2131 if result <= 0:
2132 #print content
2133 return -1, content
2134
2135 #convert rows to a dictionary indexed by core_id
2136 cores_dict = {}
2137 for row in content:
2138 if not row['core_id'] in cores_dict:
2139 cores_dict[row['core_id']] = []
2140 cores_dict[row['core_id']].append([row['thread_id'],row['id']])
2141
2142 #In case full cores are requested
2143 paired = 'N'
2144 if requirements['numa']['proc_req_type'] == 'cores':
2145 #Get/create the list of the vcpu_ids
2146 vcpu_id_list = requirements['numa']['proc_req_list']
2147 if vcpu_id_list == None:
2148 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2149
2150 for threads in cores_dict.itervalues():
2151 #we need full cores
2152 if len(threads) != 2:
2153 continue
2154
2155 #set pinning for the first thread
2156 cpu_pinning.append( [ vcpu_id_list.pop(0), threads[0][0], threads[0][1] ] )
2157
2158 #reserve so it is not used the second thread
2159 reserved_threads.append(threads[1][1])
2160
2161 if len(vcpu_id_list) == 0:
2162 break
2163
2164 #In case paired threads are requested
2165 elif requirements['numa']['proc_req_type'] == 'paired-threads':
2166 paired = 'Y'
2167 #Get/create the list of the vcpu_ids
2168 if requirements['numa']['proc_req_list'] != None:
2169 vcpu_id_list = []
2170 for pair in requirements['numa']['proc_req_list']:
2171 if len(pair)!=2:
2172 return -1, "Field paired-threads-id not properly specified"
2173 return
2174 vcpu_id_list.append(pair[0])
2175 vcpu_id_list.append(pair[1])
2176 else:
2177 vcpu_id_list = range(0,2*int(requirements['numa']['proc_req_nb']))
2178
2179 for threads in cores_dict.itervalues():
2180 #we need full cores
2181 if len(threads) != 2:
2182 continue
2183 #set pinning for the first thread
2184 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2185
2186 #set pinning for the second thread
2187 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2188
2189 if len(vcpu_id_list) == 0:
2190 break
2191
2192 #In case normal threads are requested
2193 elif requirements['numa']['proc_req_type'] == 'threads':
2194 #Get/create the list of the vcpu_ids
2195 vcpu_id_list = requirements['numa']['proc_req_list']
2196 if vcpu_id_list == None:
2197 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2198
2199 for threads_index in sorted(cores_dict, key=lambda k: len(cores_dict[k])):
2200 threads = cores_dict[threads_index]
2201 #set pinning for the first thread
2202 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2203
2204 #if exists, set pinning for the second thread
2205 if len(threads) == 2 and len(vcpu_id_list) != 0:
2206 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2207
2208 if len(vcpu_id_list) == 0:
2209 break
2210
2211 #Get the source pci addresses for the selected numa
2212 used_sriov_ports = []
2213 for port in requirements['numa']['sriov_list']:
2214 db_lock.acquire()
2215 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2216 db_lock.release()
2217 if result <= 0:
2218 #print content
2219 return -1, content
2220 for row in content:
2221 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2222 continue
2223 port['pci'] = row['pci']
2224 if 'mac_address' not in port:
2225 port['mac_address'] = row['mac']
2226 del port['mac']
2227 port['port_id']=row['id']
2228 port['Mbps_used'] = port['bandwidth']
2229 used_sriov_ports.append(row['id'])
2230 break
2231
2232 for port in requirements['numa']['port_list']:
2233 port['Mbps_used'] = None
2234 if port['dedicated'] != "yes:sriov":
2235 port['mac_address'] = port['mac']
2236 del port['mac']
2237 continue
2238 db_lock.acquire()
2239 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac', 'Mbps'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2240 db_lock.release()
2241 if result <= 0:
2242 #print content
2243 return -1, content
2244 port['Mbps_used'] = content[0]['Mbps']
2245 for row in content:
2246 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2247 continue
2248 port['pci'] = row['pci']
2249 if 'mac_address' not in port:
2250 port['mac_address'] = row['mac'] # mac cannot be set to passthrough ports
2251 del port['mac']
2252 port['port_id']=row['id']
2253 used_sriov_ports.append(row['id'])
2254 break
2255
2256 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
2257 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
2258
2259 server['host_id'] = host_id
2260
2261 #Generate dictionary for saving in db the instance resources
2262 resources = {}
2263 resources['bridged-ifaces'] = []
2264
2265 numa_dict = {}
2266 numa_dict['interfaces'] = []
2267
2268 numa_dict['interfaces'] += requirements['numa']['port_list']
2269 numa_dict['interfaces'] += requirements['numa']['sriov_list']
2270
2271 #Check bridge information
2272 unified_dataplane_iface=[]
2273 unified_dataplane_iface += requirements['numa']['port_list']
2274 unified_dataplane_iface += requirements['numa']['sriov_list']
2275
2276 for control_iface in server.get('networks', []):
2277 control_iface['net_id']=control_iface.pop('uuid')
2278 #Get the brifge name
2279 db_lock.acquire()
2280 result, content = db.get_table(FROM='nets',
2281 SELECT=('name', 'type', 'vlan', 'provider', 'enable_dhcp',
2282 'dhcp_first_ip', 'dhcp_last_ip', 'cidr'),
2283 WHERE={'uuid': control_iface['net_id']})
2284 db_lock.release()
2285 if result < 0:
2286 pass
2287 elif result==0:
2288 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface['net_id']
2289 else:
2290 network=content[0]
2291 if control_iface.get("type", 'virtual') == 'virtual':
2292 if network['type']!='bridge_data' and network['type']!='bridge_man':
2293 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface['net_id']
2294 resources['bridged-ifaces'].append(control_iface)
2295 if network.get("provider") and network["provider"][0:3] == "OVS":
2296 control_iface["type"] = "instance:ovs"
2297 else:
2298 control_iface["type"] = "instance:bridge"
2299 if network.get("vlan"):
2300 control_iface["vlan"] = network["vlan"]
2301
2302 if network.get("enable_dhcp") == 'true':
2303 control_iface["enable_dhcp"] = network.get("enable_dhcp")
2304 control_iface["dhcp_first_ip"] = network["dhcp_first_ip"]
2305 control_iface["dhcp_last_ip"] = network["dhcp_last_ip"]
2306 control_iface["cidr"] = network["cidr"]
2307 else:
2308 if network['type']!='data' and network['type']!='ptp':
2309 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface['net_id']
2310 #dataplane interface, look for it in the numa tree and asign this network
2311 iface_found=False
2312 for dataplane_iface in numa_dict['interfaces']:
2313 if dataplane_iface['name'] == control_iface.get("name"):
2314 if (dataplane_iface['dedicated'] == "yes" and control_iface["type"] != "PF") or \
2315 (dataplane_iface['dedicated'] == "no" and control_iface["type"] != "VF") or \
2316 (dataplane_iface['dedicated'] == "yes:sriov" and control_iface["type"] != "VFnotShared") :
2317 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
2318 (control_iface.get("name"), dataplane_iface['dedicated'], control_iface["type"])
2319 dataplane_iface['uuid'] = control_iface['net_id']
2320 if dataplane_iface['dedicated'] == "no":
2321 dataplane_iface['vlan'] = network['vlan']
2322 if dataplane_iface['dedicated'] != "yes" and control_iface.get("mac_address"):
2323 dataplane_iface['mac_address'] = control_iface.get("mac_address")
2324 if control_iface.get("vpci"):
2325 dataplane_iface['vpci'] = control_iface.get("vpci")
2326 iface_found=True
2327 break
2328 if not iface_found:
2329 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface.get("name")
2330
2331 resources['host_id'] = host_id
2332 resources['image_id'] = server['image_id']
2333 resources['flavor_id'] = server['flavor_id']
2334 resources['tenant_id'] = server['tenant_id']
2335 resources['ram'] = requirements['ram']
2336 resources['vcpus'] = requirements['vcpus']
2337 resources['status'] = 'CREATING'
2338
2339 if 'description' in server: resources['description'] = server['description']
2340 if 'name' in server: resources['name'] = server['name']
2341
2342 resources['extended'] = {} #optional
2343 resources['extended']['numas'] = []
2344 numa_dict['numa_id'] = numa_id
2345 numa_dict['memory'] = requirements['numa']['memory']
2346 numa_dict['cores'] = []
2347
2348 for core in cpu_pinning:
2349 numa_dict['cores'].append({'id': core[2], 'vthread': core[0], 'paired': paired})
2350 for core in reserved_threads:
2351 numa_dict['cores'].append({'id': core})
2352 resources['extended']['numas'].append(numa_dict)
2353 if extended!=None and 'devices' in extended: #TODO allow extra devices without numa
2354 resources['extended']['devices'] = extended['devices']
2355
2356
2357 # '===================================={'
2358 #print json.dumps(resources, indent=4)
2359 #print '====================================}'
2360
2361 return 0, resources
2362