9fe52741ede763986c724308a62149cbc179b6c9
[osm/openvim.git] / osm_openvim / host_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 '''
28 __author__ = "Pablo Montes, Alfonso Tierno, Leonardo Mirabal"
29 __date__ = "$10-jul-2014 12:07:15$"
30
31 import json
32 import yaml
33 import threading
34 import time
35 import Queue
36 import paramiko
37 # import subprocess
38 # import libvirt
39 import imp
40 import random
41 import os
42 import logging
43 from jsonschema import validate as js_v, exceptions as js_e
44 from vim_schema import localinfo_schema, hostinfo_schema
45
46
47 class host_thread(threading.Thread):
48 lvirt_module = None
49
50 def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode,
51 develop_bridge_iface, password=None, keyfile = None, logger_name=None, debug=None):
52 '''Init a thread.
53 Arguments:
54 'id' number of thead
55 'name' name of thread
56 'host','user': host ip or name to manage and user
57 'db', 'db_lock': database class and lock to use it in exclusion
58 '''
59 threading.Thread.__init__(self)
60 self.name = name
61 self.host = host
62 self.user = user
63 self.db = db
64 self.db_lock = db_lock
65 self.test = test
66 self.password = password
67 self.keyfile = keyfile
68 self.localinfo_dirty = False
69
70 if not test and not host_thread.lvirt_module:
71 try:
72 module_info = imp.find_module("libvirt")
73 host_thread.lvirt_module = imp.load_module("libvirt", *module_info)
74 except (IOError, ImportError) as e:
75 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e))
76 if logger_name:
77 self.logger_name = logger_name
78 else:
79 self.logger_name = "openvim.host."+name
80 self.logger = logging.getLogger(self.logger_name)
81 if debug:
82 self.logger.setLevel(getattr(logging, debug))
83
84
85 self.develop_mode = develop_mode
86 self.develop_bridge_iface = develop_bridge_iface
87 self.image_path = image_path
88 self.host_id = host_id
89 self.version = version
90
91 self.xml_level = 0
92 #self.pending ={}
93
94 self.server_status = {} #dictionary with pairs server_uuid:server_status
95 self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
96 self.next_update_server_status = 0 #time when must be check servers status
97
98 self.hostinfo = None
99
100 self.queueLock = threading.Lock()
101 self.taskQueue = Queue.Queue(2000)
102 self.ssh_conn = None
103 self.lvirt_conn_uri = "qemu+ssh://{user}@{host}/system?no_tty=1&no_verify=1".format(
104 user=self.user, host=self.host)
105 if keyfile:
106 self.lvirt_conn_uri += "&keyfile=" + keyfile
107
108 def ssh_connect(self):
109 try:
110 # Connect SSH
111 self.ssh_conn = paramiko.SSHClient()
112 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
113 self.ssh_conn.load_system_host_keys()
114 self.ssh_conn.connect(self.host, username=self.user, password=self.password, key_filename=self.keyfile,
115 timeout=10) #, None)
116 except paramiko.ssh_exception.SSHException as e:
117 text = e.args[0]
118 self.logger.error("ssh_connect ssh Exception: " + text)
119
120 def load_localinfo(self):
121 if not self.test:
122 try:
123 # Connect SSH
124 self.ssh_connect()
125
126 command = 'mkdir -p ' + self.image_path
127 # print self.name, ': command:', command
128 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
129 content = stderr.read()
130 if len(content) > 0:
131 self.logger.error("command: '%s' stderr: '%s'", command, content)
132
133 command = 'cat ' + self.image_path + '/.openvim.yaml'
134 # print self.name, ': command:', command
135 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
136 content = stdout.read()
137 if len(content) == 0:
138 self.logger.error("command: '%s' stderr='%s'", command, stderr.read())
139 raise paramiko.ssh_exception.SSHException("Error empty file, command: '{}'".format(command))
140 self.localinfo = yaml.load(content)
141 js_v(self.localinfo, localinfo_schema)
142 self.localinfo_dirty = False
143 if 'server_files' not in self.localinfo:
144 self.localinfo['server_files'] = {}
145 self.logger.debug("localinfo load from host")
146 return
147
148 except paramiko.ssh_exception.SSHException as e:
149 text = e.args[0]
150 self.logger.error("load_localinfo ssh Exception: " + text)
151 except host_thread.lvirt_module.libvirtError as e:
152 text = e.get_error_message()
153 self.logger.error("load_localinfo libvirt Exception: " + text)
154 except yaml.YAMLError as exc:
155 text = ""
156 if hasattr(exc, 'problem_mark'):
157 mark = exc.problem_mark
158 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
159 self.logger.error("load_localinfo yaml format Exception " + text)
160 except js_e.ValidationError as e:
161 text = ""
162 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
163 self.logger.error("load_localinfo format Exception: %s %s", text, str(e))
164 except Exception as e:
165 text = str(e)
166 self.logger.error("load_localinfo Exception: " + text)
167
168 #not loaded, insert a default data and force saving by activating dirty flag
169 self.localinfo = {'files':{}, 'server_files':{} }
170 #self.localinfo_dirty=True
171 self.localinfo_dirty=False
172
173 def load_hostinfo(self):
174 if self.test:
175 return;
176 try:
177 #Connect SSH
178 self.ssh_connect()
179
180
181 command = 'cat ' + self.image_path + '/hostinfo.yaml'
182 #print self.name, ': command:', command
183 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
184 content = stdout.read()
185 if len(content) == 0:
186 self.logger.error("command: '%s' stderr: '%s'", command, stderr.read())
187 raise paramiko.ssh_exception.SSHException("Error empty file ")
188 self.hostinfo = yaml.load(content)
189 js_v(self.hostinfo, hostinfo_schema)
190 self.logger.debug("hostlinfo load from host " + str(self.hostinfo))
191 return
192
193 except paramiko.ssh_exception.SSHException as e:
194 text = e.args[0]
195 self.logger.error("load_hostinfo ssh Exception: " + text)
196 except host_thread.lvirt_module.libvirtError as e:
197 text = e.get_error_message()
198 self.logger.error("load_hostinfo libvirt Exception: " + text)
199 except yaml.YAMLError as exc:
200 text = ""
201 if hasattr(exc, 'problem_mark'):
202 mark = exc.problem_mark
203 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
204 self.logger.error("load_hostinfo yaml format Exception " + text)
205 except js_e.ValidationError as e:
206 text = ""
207 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
208 self.logger.error("load_hostinfo format Exception: %s %s", text, e.message)
209 except Exception as e:
210 text = str(e)
211 self.logger.error("load_hostinfo Exception: " + text)
212
213 #not loaded, insert a default data
214 self.hostinfo = None
215
216 def save_localinfo(self, tries=3):
217 if self.test:
218 self.localinfo_dirty = False
219 return
220
221 while tries>=0:
222 tries-=1
223
224 try:
225 command = 'cat > ' + self.image_path + '/.openvim.yaml'
226 self.logger.debug("command:" + command)
227 (stdin, _, _) = self.ssh_conn.exec_command(command)
228 yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True)
229 self.localinfo_dirty = False
230 break #while tries
231
232 except paramiko.ssh_exception.SSHException as e:
233 text = e.args[0]
234 self.logger.error("save_localinfo ssh Exception: " + text)
235 if "SSH session not active" in text:
236 self.ssh_connect()
237 except host_thread.lvirt_module.libvirtError as e:
238 text = e.get_error_message()
239 self.logger.error("save_localinfo libvirt Exception: " + text)
240 except yaml.YAMLError as exc:
241 text = ""
242 if hasattr(exc, 'problem_mark'):
243 mark = exc.problem_mark
244 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
245 self.logger.error("save_localinfo yaml format Exception " + text)
246 except Exception as e:
247 text = str(e)
248 self.logger.error("save_localinfo Exception: " + text)
249
250 def load_servers_from_db(self):
251 self.db_lock.acquire()
252 r,c = self.db.get_table(SELECT=('uuid','status', 'image_id'), FROM='instances', WHERE={'host_id': self.host_id})
253 self.db_lock.release()
254
255 self.server_status = {}
256 if r<0:
257 self.logger.error("Error getting data from database: " + c)
258 return
259 for server in c:
260 self.server_status[ server['uuid'] ] = server['status']
261
262 #convert from old version to new one
263 if 'inc_files' in self.localinfo and server['uuid'] in self.localinfo['inc_files']:
264 server_files_dict = {'source file': self.localinfo['inc_files'][ server['uuid'] ] [0], 'file format':'raw' }
265 if server_files_dict['source file'][-5:] == 'qcow2':
266 server_files_dict['file format'] = 'qcow2'
267
268 self.localinfo['server_files'][ server['uuid'] ] = { server['image_id'] : server_files_dict }
269 if 'inc_files' in self.localinfo:
270 del self.localinfo['inc_files']
271 self.localinfo_dirty = True
272
273 def delete_unused_files(self):
274 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
275 Deletes unused entries at self.loacalinfo and the corresponding local files.
276 The only reason for this mismatch is the manual deletion of instances (VM) at database
277 '''
278 if self.test:
279 return
280 for uuid,images in self.localinfo['server_files'].items():
281 if uuid not in self.server_status:
282 for localfile in images.values():
283 try:
284 self.logger.debug("deleting file '%s' of unused server '%s'", localfile['source file'], uuid)
285 self.delete_file(localfile['source file'])
286 except paramiko.ssh_exception.SSHException as e:
287 self.logger.error("Exception deleting file '%s': %s", localfile['source file'], str(e))
288 del self.localinfo['server_files'][uuid]
289 self.localinfo_dirty = True
290
291 def insert_task(self, task, *aditional):
292 try:
293 self.queueLock.acquire()
294 task = self.taskQueue.put( (task,) + aditional, timeout=5)
295 self.queueLock.release()
296 return 1, None
297 except Queue.Full:
298 return -1, "timeout inserting a task over host " + self.name
299
300 def run(self):
301 while True:
302 self.load_localinfo()
303 self.load_hostinfo()
304 self.load_servers_from_db()
305 self.delete_unused_files()
306 while True:
307 try:
308 self.queueLock.acquire()
309 if not self.taskQueue.empty():
310 task = self.taskQueue.get()
311 else:
312 task = None
313 self.queueLock.release()
314
315 if task is None:
316 now=time.time()
317 if self.localinfo_dirty:
318 self.save_localinfo()
319 elif self.next_update_server_status < now:
320 self.update_servers_status()
321 self.next_update_server_status = now + 5
322 elif len(self.pending_terminate_server)>0 and self.pending_terminate_server[0][0]<now:
323 self.server_forceoff()
324 else:
325 time.sleep(1)
326 continue
327
328 if task[0] == 'instance':
329 self.logger.debug("processing task instance " + str(task[1]['action']))
330 retry = 0
331 while retry < 2:
332 retry += 1
333 r = self.action_on_server(task[1], retry==2)
334 if r >= 0:
335 break
336 elif task[0] == 'image':
337 pass
338 elif task[0] == 'exit':
339 self.logger.debug("processing task exit")
340 self.terminate()
341 return 0
342 elif task[0] == 'reload':
343 self.logger.debug("processing task reload terminating and relaunching")
344 self.terminate()
345 break
346 elif task[0] == 'edit-iface':
347 self.logger.debug("processing task edit-iface port={}, old_net={}, new_net={}".format(
348 task[1], task[2], task[3]))
349 self.edit_iface(task[1], task[2], task[3])
350 elif task[0] == 'restore-iface':
351 self.logger.debug("processing task restore-iface={} mac={}".format(task[1], task[2]))
352 self.restore_iface(task[1], task[2])
353 elif task[0] == 'new-ovsbridge':
354 self.logger.debug("Creating compute OVS bridge")
355 self.create_ovs_bridge()
356 elif task[0] == 'new-vxlan':
357 self.logger.debug("Creating vxlan tunnel='{}', remote ip='{}'".format(task[1], task[2]))
358 self.create_ovs_vxlan_tunnel(task[1], task[2])
359 elif task[0] == 'del-ovsbridge':
360 self.logger.debug("Deleting OVS bridge")
361 self.delete_ovs_bridge()
362 elif task[0] == 'del-vxlan':
363 self.logger.debug("Deleting vxlan {} tunnel".format(task[1]))
364 self.delete_ovs_vxlan_tunnel(task[1])
365 elif task[0] == 'create-ovs-bridge-port':
366 self.logger.debug("Adding port ovim-{} to OVS bridge".format(task[1]))
367 self.create_ovs_bridge_port(task[1])
368 elif task[0] == 'del-ovs-port':
369 self.logger.debug("Delete bridge attached to ovs port vlan {} net {}".format(task[1], task[2]))
370 self.delete_bridge_port_attached_to_ovs(task[1], task[2])
371 else:
372 self.logger.debug("unknown task " + str(task))
373
374 except Exception as e:
375 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
376
377 def server_forceoff(self, wait_until_finished=False):
378 while len(self.pending_terminate_server)>0:
379 now = time.time()
380 if self.pending_terminate_server[0][0]>now:
381 if wait_until_finished:
382 time.sleep(1)
383 continue
384 else:
385 return
386 req={'uuid':self.pending_terminate_server[0][1],
387 'action':{'terminate':'force'},
388 'status': None
389 }
390 self.action_on_server(req)
391 self.pending_terminate_server.pop(0)
392
393 def terminate(self):
394 try:
395 self.server_forceoff(True)
396 if self.localinfo_dirty:
397 self.save_localinfo()
398 if not self.test:
399 self.ssh_conn.close()
400 except Exception as e:
401 text = str(e)
402 self.logger.error("terminate Exception: " + text)
403 self.logger.debug("exit from host_thread")
404
405 def get_local_iface_name(self, generic_name):
406 if self.hostinfo != None and "iface_names" in self.hostinfo and generic_name in self.hostinfo["iface_names"]:
407 return self.hostinfo["iface_names"][generic_name]
408 return generic_name
409
410 def create_xml_server(self, server, dev_list, server_metadata={}):
411 """Function that implements the generation of the VM XML definition.
412 Additional devices are in dev_list list
413 The main disk is upon dev_list[0]"""
414
415 #get if operating system is Windows
416 windows_os = False
417 os_type = server_metadata.get('os_type', None)
418 if os_type == None and 'metadata' in dev_list[0]:
419 os_type = dev_list[0]['metadata'].get('os_type', None)
420 if os_type != None and os_type.lower() == "windows":
421 windows_os = True
422 #get type of hard disk bus
423 bus_ide = True if windows_os else False
424 bus = server_metadata.get('bus', None)
425 if bus == None and 'metadata' in dev_list[0]:
426 bus = dev_list[0]['metadata'].get('bus', None)
427 if bus != None:
428 bus_ide = True if bus=='ide' else False
429
430 self.xml_level = 0
431
432 text = "<domain type='kvm'>"
433 #get topology
434 topo = server_metadata.get('topology', None)
435 if topo == None and 'metadata' in dev_list[0]:
436 topo = dev_list[0]['metadata'].get('topology', None)
437 #name
438 name = server.get('name', '')[:28] + "_" + server['uuid'][:28] #qemu impose a length limit of 59 chars or not start. Using 58
439 text += self.inc_tab() + "<name>" + name+ "</name>"
440 #uuid
441 text += self.tab() + "<uuid>" + server['uuid'] + "</uuid>"
442
443 numa={}
444 if 'extended' in server and server['extended']!=None and 'numas' in server['extended']:
445 numa = server['extended']['numas'][0]
446 #memory
447 use_huge = False
448 memory = int(numa.get('memory',0))*1024*1024 #in KiB
449 if memory==0:
450 memory = int(server['ram'])*1024;
451 else:
452 if not self.develop_mode:
453 use_huge = True
454 if memory==0:
455 return -1, 'No memory assigned to instance'
456 memory = str(memory)
457 text += self.tab() + "<memory unit='KiB'>" +memory+"</memory>"
458 text += self.tab() + "<currentMemory unit='KiB'>" +memory+ "</currentMemory>"
459 if use_huge:
460 text += self.tab()+'<memoryBacking>'+ \
461 self.inc_tab() + '<hugepages/>'+ \
462 self.dec_tab()+ '</memoryBacking>'
463
464 #cpu
465 use_cpu_pinning=False
466 vcpus = int(server.get("vcpus",0))
467 cpu_pinning = []
468 if 'cores-source' in numa:
469 use_cpu_pinning=True
470 for index in range(0, len(numa['cores-source'])):
471 cpu_pinning.append( [ numa['cores-id'][index], numa['cores-source'][index] ] )
472 vcpus += 1
473 if 'threads-source' in numa:
474 use_cpu_pinning=True
475 for index in range(0, len(numa['threads-source'])):
476 cpu_pinning.append( [ numa['threads-id'][index], numa['threads-source'][index] ] )
477 vcpus += 1
478 if 'paired-threads-source' in numa:
479 use_cpu_pinning=True
480 for index in range(0, len(numa['paired-threads-source'])):
481 cpu_pinning.append( [numa['paired-threads-id'][index][0], numa['paired-threads-source'][index][0] ] )
482 cpu_pinning.append( [numa['paired-threads-id'][index][1], numa['paired-threads-source'][index][1] ] )
483 vcpus += 2
484
485 if use_cpu_pinning and not self.develop_mode:
486 text += self.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning)) +"</vcpu>" + \
487 self.tab()+'<cputune>'
488 self.xml_level += 1
489 for i in range(0, len(cpu_pinning)):
490 text += self.tab() + "<vcpupin vcpu='" +str(cpu_pinning[i][0])+ "' cpuset='" +str(cpu_pinning[i][1]) +"'/>"
491 text += self.dec_tab()+'</cputune>'+ \
492 self.tab() + '<numatune>' +\
493 self.inc_tab() + "<memory mode='strict' nodeset='" +str(numa['source'])+ "'/>" +\
494 self.dec_tab() + '</numatune>'
495 else:
496 if vcpus==0:
497 return -1, "Instance without number of cpus"
498 text += self.tab()+"<vcpu>" + str(vcpus) + "</vcpu>"
499
500 #boot
501 boot_cdrom = False
502 for dev in dev_list:
503 if dev['type']=='cdrom' :
504 boot_cdrom = True
505 break
506 text += self.tab()+ '<os>' + \
507 self.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
508 if boot_cdrom:
509 text += self.tab() + "<boot dev='cdrom'/>"
510 text += self.tab() + "<boot dev='hd'/>" + \
511 self.dec_tab()+'</os>'
512 #features
513 text += self.tab()+'<features>'+\
514 self.inc_tab()+'<acpi/>' +\
515 self.tab()+'<apic/>' +\
516 self.tab()+'<pae/>'+ \
517 self.dec_tab() +'</features>'
518 if topo == "oneSocket:hyperthreading":
519 if vcpus % 2 != 0:
520 return -1, 'Cannot expose hyperthreading with an odd number of vcpus'
521 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='2' /> </cpu>" % vcpus/2
522 elif windows_os or topo == "oneSocket":
523 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>" % vcpus
524 else:
525 text += self.tab() + "<cpu mode='host-model'></cpu>"
526 text += self.tab() + "<clock offset='utc'/>" +\
527 self.tab() + "<on_poweroff>preserve</on_poweroff>" + \
528 self.tab() + "<on_reboot>restart</on_reboot>" + \
529 self.tab() + "<on_crash>restart</on_crash>"
530 text += self.tab() + "<devices>" + \
531 self.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
532 self.tab() + "<serial type='pty'>" +\
533 self.inc_tab() + "<target port='0'/>" + \
534 self.dec_tab() + "</serial>" +\
535 self.tab() + "<console type='pty'>" + \
536 self.inc_tab()+ "<target type='serial' port='0'/>" + \
537 self.dec_tab()+'</console>'
538 if windows_os:
539 text += self.tab() + "<controller type='usb' index='0'/>" + \
540 self.tab() + "<controller type='ide' index='0'/>" + \
541 self.tab() + "<input type='mouse' bus='ps2'/>" + \
542 self.tab() + "<sound model='ich6'/>" + \
543 self.tab() + "<video>" + \
544 self.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
545 self.dec_tab() + "</video>" + \
546 self.tab() + "<memballoon model='virtio'/>" + \
547 self.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
548
549 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
550 #> self.dec_tab()+'</hostdev>\n' +\
551 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
552 if windows_os:
553 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
554 else:
555 #If image contains 'GRAPH' include graphics
556 #if 'GRAPH' in image:
557 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
558 self.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
559 self.dec_tab() + "</graphics>"
560
561 vd_index = 'a'
562 for dev in dev_list:
563 bus_ide_dev = bus_ide
564 if dev['type']=='cdrom' or dev['type']=='disk':
565 if dev['type']=='cdrom':
566 bus_ide_dev = True
567 text += self.tab() + "<disk type='file' device='"+dev['type']+"'>"
568 if 'file format' in dev:
569 text += self.inc_tab() + "<driver name='qemu' type='" +dev['file format']+ "' cache='writethrough'/>"
570 if 'source file' in dev:
571 text += self.tab() + "<source file='" +dev['source file']+ "'/>"
572 #elif v['type'] == 'block':
573 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
574 #else:
575 # return -1, 'Unknown disk type ' + v['type']
576 vpci = dev.get('vpci',None)
577 if vpci == None:
578 vpci = dev['metadata'].get('vpci',None)
579 text += self.pci2xml(vpci)
580
581 if bus_ide_dev:
582 text += self.tab() + "<target dev='hd" +vd_index+ "' bus='ide'/>" #TODO allows several type of disks
583 else:
584 text += self.tab() + "<target dev='vd" +vd_index+ "' bus='virtio'/>"
585 text += self.dec_tab() + '</disk>'
586 vd_index = chr(ord(vd_index)+1)
587 elif dev['type']=='xml':
588 dev_text = dev['xml']
589 if 'vpci' in dev:
590 dev_text = dev_text.replace('__vpci__', dev['vpci'])
591 if 'source file' in dev:
592 dev_text = dev_text.replace('__file__', dev['source file'])
593 if 'file format' in dev:
594 dev_text = dev_text.replace('__format__', dev['source file'])
595 if '__dev__' in dev_text:
596 dev_text = dev_text.replace('__dev__', vd_index)
597 vd_index = chr(ord(vd_index)+1)
598 text += dev_text
599 else:
600 return -1, 'Unknown device type ' + dev['type']
601
602 net_nb=0
603 bridge_interfaces = server.get('networks', [])
604 for v in bridge_interfaces:
605 #Get the brifge name
606 self.db_lock.acquire()
607 result, content = self.db.get_table(FROM='nets', SELECT=('provider',),WHERE={'uuid':v['net_id']} )
608 self.db_lock.release()
609 if result <= 0:
610 self.logger.error("create_xml_server ERROR %d getting nets %s", result, content)
611 return -1, content
612 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
613 #I know it is not secure
614 #for v in sorted(desc['network interfaces'].itervalues()):
615 model = v.get("model", None)
616 if content[0]['provider']=='default':
617 text += self.tab() + "<interface type='network'>" + \
618 self.inc_tab() + "<source network='" +content[0]['provider']+ "'/>"
619 elif content[0]['provider'][0:7]=='macvtap':
620 text += self.tab()+"<interface type='direct'>" + \
621 self.inc_tab() + "<source dev='" + self.get_local_iface_name(content[0]['provider'][8:]) + "' mode='bridge'/>" + \
622 self.tab() + "<target dev='macvtap0'/>"
623 if windows_os:
624 text += self.tab() + "<alias name='net" + str(net_nb) + "'/>"
625 elif model==None:
626 model = "virtio"
627 elif content[0]['provider'][0:6]=='bridge':
628 text += self.tab() + "<interface type='bridge'>" + \
629 self.inc_tab()+"<source bridge='" +self.get_local_iface_name(content[0]['provider'][7:])+ "'/>"
630 if windows_os:
631 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
632 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
633 elif model==None:
634 model = "virtio"
635 elif content[0]['provider'][0:3] == "OVS":
636 vlan = content[0]['provider'].replace('OVS:', '')
637 text += self.tab() + "<interface type='bridge'>" + \
638 self.inc_tab() + "<source bridge='ovim-" + str(vlan) + "'/>"
639 else:
640 return -1, 'Unknown Bridge net provider ' + content[0]['provider']
641 if model!=None:
642 text += self.tab() + "<model type='" +model+ "'/>"
643 if v.get('mac_address', None) != None:
644 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
645 text += self.pci2xml(v.get('vpci',None))
646 text += self.dec_tab()+'</interface>'
647
648 net_nb += 1
649
650 interfaces = numa.get('interfaces', [])
651
652 net_nb=0
653 for v in interfaces:
654 if self.develop_mode: #map these interfaces to bridges
655 text += self.tab() + "<interface type='bridge'>" + \
656 self.inc_tab()+"<source bridge='" +self.develop_bridge_iface+ "'/>"
657 if windows_os:
658 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
659 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
660 else:
661 text += self.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
662 if v.get('mac_address', None) != None:
663 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
664 text += self.pci2xml(v.get('vpci',None))
665 text += self.dec_tab()+'</interface>'
666 continue
667
668 if v['dedicated'] == 'yes': #passthrought
669 text += self.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
670 self.inc_tab() + "<source>"
671 self.inc_tab()
672 text += self.pci2xml(v['source'])
673 text += self.dec_tab()+'</source>'
674 text += self.pci2xml(v.get('vpci',None))
675 if windows_os:
676 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
677 text += self.dec_tab()+'</hostdev>'
678 net_nb += 1
679 else: #sriov_interfaces
680 #skip not connected interfaces
681 if v.get("net_id") == None:
682 continue
683 text += self.tab() + "<interface type='hostdev' managed='yes'>"
684 self.inc_tab()
685 if v.get('mac_address', None) != None:
686 text+= self.tab() + "<mac address='" +v['mac_address']+ "'/>"
687 text+= self.tab()+'<source>'
688 self.inc_tab()
689 text += self.pci2xml(v['source'])
690 text += self.dec_tab()+'</source>'
691 if v.get('vlan',None) != None:
692 text += self.tab() + "<vlan> <tag id='" + str(v['vlan']) + "'/> </vlan>"
693 text += self.pci2xml(v.get('vpci',None))
694 if windows_os:
695 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
696 text += self.dec_tab()+'</interface>'
697
698
699 text += self.dec_tab()+'</devices>'+\
700 self.dec_tab()+'</domain>'
701 return 0, text
702
703 def pci2xml(self, pci):
704 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
705 alows an empty pci text'''
706 if pci is None:
707 return ""
708 first_part = pci.split(':')
709 second_part = first_part[2].split('.')
710 return self.tab() + "<address type='pci' domain='0x" + first_part[0] + \
711 "' bus='0x" + first_part[1] + "' slot='0x" + second_part[0] + \
712 "' function='0x" + second_part[1] + "'/>"
713
714 def tab(self):
715 """Return indentation according to xml_level"""
716 return "\n" + (' '*self.xml_level)
717
718 def inc_tab(self):
719 """Increment and return indentation according to xml_level"""
720 self.xml_level += 1
721 return self.tab()
722
723 def dec_tab(self):
724 """Decrement and return indentation according to xml_level"""
725 self.xml_level -= 1
726 return self.tab()
727
728 def create_ovs_bridge(self):
729 """
730 Create a bridge in compute OVS to allocate VMs
731 :return: True if success
732 """
733 if self.test:
734 return True
735 try:
736 command = 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
737 self.logger.debug("command: " + command)
738 (_, stdout, _) = self.ssh_conn.exec_command(command)
739 content = stdout.read()
740 if len(content) == 0:
741 return True
742 else:
743 return False
744 except paramiko.ssh_exception.SSHException as e:
745 self.logger.error("create_ovs_bridge ssh Exception: " + str(e))
746 if "SSH session not active" in str(e):
747 self.ssh_connect()
748 return False
749
750 def delete_port_to_ovs_bridge(self, vlan, net_uuid):
751 """
752 Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed
753 :param vlan: vlan port id
754 :param net_uuid: network id
755 :return:
756 """
757
758 if self.test:
759 return True
760 try:
761 port_name = 'ovim-' + str(vlan)
762 command = 'sudo ovs-vsctl del-port br-int ' + port_name
763 self.logger.debug("command: " + command)
764 (_, stdout, _) = self.ssh_conn.exec_command(command)
765 content = stdout.read()
766 if len(content) == 0:
767 return True
768 else:
769 return False
770 except paramiko.ssh_exception.SSHException as e:
771 self.logger.error("delete_port_to_ovs_bridge ssh Exception: " + str(e))
772 if "SSH session not active" in str(e):
773 self.ssh_connect()
774 return False
775
776 def delete_dhcp_server(self, vlan, net_uuid, dhcp_path):
777 """
778 Delete dhcp server process lining in namespace
779 :param vlan: segmentation id
780 :param net_uuid: network uuid
781 :param dhcp_path: conf fiel path that live in namespace side
782 :return:
783 """
784 if self.test:
785 return True
786 if not self.is_dhcp_port_free(vlan, net_uuid):
787 return True
788 try:
789 net_namespace = 'ovim-' + str(vlan)
790 dhcp_path = os.path.join(dhcp_path, net_namespace)
791 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
792
793 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_file
794 self.logger.debug("command: " + command)
795 (_, stdout, _) = self.ssh_conn.exec_command(command)
796 content = stdout.read()
797
798 command = 'sudo ip netns exec ' + net_namespace + ' kill -9 ' + content
799 self.logger.debug("command: " + command)
800 (_, stdout, _) = self.ssh_conn.exec_command(command)
801 content = stdout.read()
802
803 # if len(content) == 0:
804 # return True
805 # else:
806 # return False
807 except paramiko.ssh_exception.SSHException as e:
808 self.logger.error("delete_dhcp_server ssh Exception: " + str(e))
809 if "SSH session not active" in str(e):
810 self.ssh_connect()
811 return False
812
813 def is_dhcp_port_free(self, host_id, net_uuid):
814 """
815 Check if any port attached to the a net in a vxlan mesh across computes nodes
816 :param host_id: host id
817 :param net_uuid: network id
818 :return: True if is not free
819 """
820 self.db_lock.acquire()
821 result, content = self.db.get_table(
822 FROM='ports',
823 WHERE={'type': 'instance:ovs', 'net_id': net_uuid}
824 )
825 self.db_lock.release()
826
827 if len(content) > 0:
828 return False
829 else:
830 return True
831
832 def is_port_free(self, host_id, net_uuid):
833 """
834 Check if there not ovs ports of a network in a compute host.
835 :param host_id: host id
836 :param net_uuid: network id
837 :return: True if is not free
838 """
839
840 self.db_lock.acquire()
841 result, content = self.db.get_table(
842 FROM='ports as p join instances as i on p.instance_id=i.uuid',
843 WHERE={"i.host_id": self.host_id, 'p.type': 'instance:ovs', 'p.net_id': net_uuid}
844 )
845 self.db_lock.release()
846
847 if len(content) > 0:
848 return False
849 else:
850 return True
851
852 def add_port_to_ovs_bridge(self, vlan):
853 """
854 Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge
855 :param vlan: vlan port id
856 :return: True if success
857 """
858
859 if self.test:
860 return True
861 try:
862 port_name = 'ovim-' + str(vlan)
863 command = 'sudo ovs-vsctl add-port br-int ' + port_name + ' tag=' + str(vlan)
864 self.logger.debug("command: " + command)
865 (_, stdout, _) = self.ssh_conn.exec_command(command)
866 content = stdout.read()
867 if len(content) == 0:
868 return True
869 else:
870 return False
871 except paramiko.ssh_exception.SSHException as e:
872 self.logger.error("add_port_to_ovs_bridge ssh Exception: " + str(e))
873 if "SSH session not active" in str(e):
874 self.ssh_connect()
875 return False
876
877 def delete_dhcp_port(self, vlan, net_uuid):
878 """
879 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
880 :param vlan: segmentation id
881 :param net_uuid: network id
882 :return: True if success
883 """
884
885 if self.test:
886 return True
887
888 if not self.is_dhcp_port_free(vlan, net_uuid):
889 return True
890 self.delete_dhcp_interfaces(vlan)
891 return True
892
893 def delete_bridge_port_attached_to_ovs(self, vlan, net_uuid):
894 """
895 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
896 :param vlan:
897 :param net_uuid:
898 :return: True if success
899 """
900 if self.test:
901 return
902
903 if not self.is_port_free(vlan, net_uuid):
904 return True
905 self.delete_port_to_ovs_bridge(vlan, net_uuid)
906 self.delete_linux_bridge(vlan)
907 return True
908
909 def delete_linux_bridge(self, vlan):
910 """
911 Delete a linux bridge in a scpecific compute.
912 :param vlan: vlan port id
913 :return: True if success
914 """
915
916 if self.test:
917 return True
918 try:
919 port_name = 'ovim-' + str(vlan)
920 command = 'sudo ip link set dev veth0-' + str(vlan) + ' down'
921 self.logger.debug("command: " + command)
922 (_, stdout, _) = self.ssh_conn.exec_command(command)
923 # content = stdout.read()
924 #
925 # if len(content) != 0:
926 # return False
927 command = 'sudo ifconfig ' + port_name + ' down && sudo brctl delbr ' + port_name
928 self.logger.debug("command: " + command)
929 (_, stdout, _) = self.ssh_conn.exec_command(command)
930 content = stdout.read()
931 if len(content) == 0:
932 return True
933 else:
934 return False
935 except paramiko.ssh_exception.SSHException as e:
936 self.logger.error("delete_linux_bridge ssh Exception: " + str(e))
937 if "SSH session not active" in str(e):
938 self.ssh_connect()
939 return False
940
941 def create_ovs_bridge_port(self, vlan):
942 """
943 Generate a linux bridge and attache the port to a OVS bridge
944 :param vlan: vlan port id
945 :return:
946 """
947 if self.test:
948 return
949 self.create_linux_bridge(vlan)
950 self.add_port_to_ovs_bridge(vlan)
951
952 def create_linux_bridge(self, vlan):
953 """
954 Create a linux bridge with STP active
955 :param vlan: netowrk vlan id
956 :return:
957 """
958
959 if self.test:
960 return True
961 try:
962 port_name = 'ovim-' + str(vlan)
963 command = 'sudo brctl show | grep ' + port_name
964 self.logger.debug("command: " + command)
965 (_, stdout, _) = self.ssh_conn.exec_command(command)
966 content = stdout.read()
967
968 # if exist nothing to create
969 # if len(content) == 0:
970 # return False
971
972 command = 'sudo brctl addbr ' + port_name
973 self.logger.debug("command: " + command)
974 (_, stdout, _) = self.ssh_conn.exec_command(command)
975 content = stdout.read()
976
977 # if len(content) == 0:
978 # return True
979 # else:
980 # return False
981
982 command = 'sudo brctl stp ' + port_name + ' on'
983 self.logger.debug("command: " + command)
984 (_, stdout, _) = self.ssh_conn.exec_command(command)
985 content = stdout.read()
986
987 # if len(content) == 0:
988 # return True
989 # else:
990 # return False
991 command = 'sudo ip link set dev ' + port_name + ' up'
992 self.logger.debug("command: " + command)
993 (_, stdout, _) = self.ssh_conn.exec_command(command)
994 content = stdout.read()
995
996 if len(content) == 0:
997 return True
998 else:
999 return False
1000 except paramiko.ssh_exception.SSHException as e:
1001 self.logger.error("create_linux_bridge ssh Exception: " + str(e))
1002 if "SSH session not active" in str(e):
1003 self.ssh_connect()
1004 return False
1005
1006 def set_mac_dhcp_server(self, ip, mac, vlan, netmask, dhcp_path):
1007 """
1008 Write into dhcp conf file a rule to assigned a fixed ip given to an specific MAC address
1009 :param ip: IP address asigned to a VM
1010 :param mac: VM vnic mac to be macthed with the IP received
1011 :param vlan: Segmentation id
1012 :param netmask: netmask value
1013 :param path: dhcp conf file path that live in namespace side
1014 :return: True if success
1015 """
1016
1017 if self.test:
1018 return True
1019
1020 net_namespace = 'ovim-' + str(vlan)
1021 dhcp_path = os.path.join(dhcp_path, net_namespace)
1022 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
1023
1024 if not ip:
1025 return False
1026 try:
1027 ip_data = mac.upper() + ',' + ip
1028
1029 command = 'sudo ip netns exec ' + net_namespace + ' touch ' + dhcp_hostsdir
1030 self.logger.debug("command: " + command)
1031 (_, stdout, _) = self.ssh_conn.exec_command(command)
1032 content = stdout.read()
1033
1034 command = 'sudo ip netns exec ' + net_namespace + ' sudo bash -ec "echo ' + ip_data + ' >> ' + dhcp_hostsdir + '"'
1035
1036 self.logger.debug("command: " + command)
1037 (_, stdout, _) = self.ssh_conn.exec_command(command)
1038 content = stdout.read()
1039
1040 if len(content) == 0:
1041 return True
1042 else:
1043 return False
1044 except paramiko.ssh_exception.SSHException as e:
1045 self.logger.error("set_mac_dhcp_server ssh Exception: " + str(e))
1046 if "SSH session not active" in str(e):
1047 self.ssh_connect()
1048 return False
1049
1050 def delete_mac_dhcp_server(self, ip, mac, vlan, dhcp_path):
1051 """
1052 Delete into dhcp conf file the ip assigned to a specific MAC address
1053
1054 :param ip: IP address asigned to a VM
1055 :param mac: VM vnic mac to be macthed with the IP received
1056 :param vlan: Segmentation id
1057 :param dhcp_path: dhcp conf file path that live in namespace side
1058 :return:
1059 """
1060
1061 if self.test:
1062 return False
1063 try:
1064 net_namespace = 'ovim-' + str(vlan)
1065 dhcp_path = os.path.join(dhcp_path, net_namespace)
1066 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
1067
1068 if not ip:
1069 return False
1070
1071 ip_data = mac.upper() + ',' + ip
1072
1073 command = 'sudo ip netns exec ' + net_namespace + ' sudo sed -i \'/' + ip_data + '/d\' ' + dhcp_hostsdir
1074 self.logger.debug("command: " + command)
1075 (_, stdout, _) = self.ssh_conn.exec_command(command)
1076 content = stdout.read()
1077
1078 if len(content) == 0:
1079 return True
1080 else:
1081 return False
1082
1083 except paramiko.ssh_exception.SSHException as e:
1084 self.logger.error("set_mac_dhcp_server ssh Exception: " + str(e))
1085 if "SSH session not active" in str(e):
1086 self.ssh_connect()
1087 return False
1088
1089 def launch_dhcp_server(self, vlan, ip_range, netmask, dhcp_path, gateway):
1090 """
1091 Generate a linux bridge and attache the port to a OVS bridge
1092 :param self:
1093 :param vlan: Segmentation id
1094 :param ip_range: IP dhcp range
1095 :param netmask: network netmask
1096 :param dhcp_path: dhcp conf file path that live in namespace side
1097 :param gateway: Gateway address for dhcp net
1098 :return: True if success
1099 """
1100
1101 if self.test:
1102 return True
1103 try:
1104 interface = 'tap-' + str(vlan)
1105 net_namespace = 'ovim-' + str(vlan)
1106 dhcp_path = os.path.join(dhcp_path, net_namespace)
1107 leases_path = os.path.join(dhcp_path, "dnsmasq.leases")
1108 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
1109
1110 dhcp_range = ip_range[0] + ',' + ip_range[1] + ',' + netmask
1111
1112 command = 'sudo ip netns exec ' + net_namespace + ' mkdir -p ' + dhcp_path
1113 self.logger.debug("command: " + command)
1114 (_, stdout, _) = self.ssh_conn.exec_command(command)
1115 content = stdout.read()
1116
1117 pid_path = os.path.join(dhcp_path, 'dnsmasq.pid')
1118 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_path
1119 self.logger.debug("command: " + command)
1120 (_, stdout, _) = self.ssh_conn.exec_command(command)
1121 content = stdout.read()
1122 # check if pid is runing
1123 pid_status_path = content
1124 if content:
1125 command = "ps aux | awk '{print $2 }' | grep " + pid_status_path
1126 self.logger.debug("command: " + command)
1127 (_, stdout, _) = self.ssh_conn.exec_command(command)
1128 content = stdout.read()
1129 if not content:
1130 command = 'sudo ip netns exec ' + net_namespace + ' /usr/sbin/dnsmasq --strict-order --except-interface=lo ' \
1131 '--interface=' + interface + ' --bind-interfaces --dhcp-hostsdir=' + dhcp_path + \
1132 ' --dhcp-range ' + dhcp_range + ' --pid-file=' + pid_file + ' --dhcp-leasefile=' + leases_path + \
1133 ' --listen-address ' + gateway
1134
1135 self.logger.debug("command: " + command)
1136 (_, stdout, _) = self.ssh_conn.exec_command(command)
1137 content = stdout.readline()
1138
1139 if len(content) == 0:
1140 return True
1141 else:
1142 return False
1143 except paramiko.ssh_exception.SSHException as e:
1144 self.logger.error("launch_dhcp_server ssh Exception: " + str(e))
1145 if "SSH session not active" in str(e):
1146 self.ssh_connect()
1147 return False
1148
1149 def delete_dhcp_interfaces(self, vlan):
1150 """
1151 Create a linux bridge with STP active
1152 :param vlan: netowrk vlan id
1153 :return:
1154 """
1155
1156 if self.test:
1157 return True
1158 try:
1159 net_namespace = 'ovim-' + str(vlan)
1160 command = 'sudo ovs-vsctl del-port br-int ovs-tap-' + str(vlan)
1161 self.logger.debug("command: " + command)
1162 (_, stdout, _) = self.ssh_conn.exec_command(command)
1163 content = stdout.read()
1164
1165 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + str(vlan) + ' down'
1166 self.logger.debug("command: " + command)
1167 (_, stdout, _) = self.ssh_conn.exec_command(command)
1168 content = stdout.read()
1169
1170 command = 'sudo ip link set dev ovs-tap-' + str(vlan) + ' down'
1171 self.logger.debug("command: " + command)
1172 (_, stdout, _) = self.ssh_conn.exec_command(command)
1173 content = stdout.read()
1174 except paramiko.ssh_exception.SSHException as e:
1175 self.logger.error("delete_dhcp_interfaces ssh Exception: " + str(e))
1176 if "SSH session not active" in str(e):
1177 self.ssh_connect()
1178 return False
1179
1180 def create_dhcp_interfaces(self, vlan, ip_listen_address, netmask):
1181 """
1182 Create a linux bridge with STP active
1183 :param vlan: segmentation id
1184 :param ip_listen_address: Listen Ip address for the dhcp service, the tap interface living in namesapce side
1185 :param netmask: dhcp net CIDR
1186 :return: True if success
1187 """
1188
1189 if self.test:
1190 return True
1191 try:
1192 net_namespace = 'ovim-' + str(vlan)
1193 namespace_interface = 'tap-' + str(vlan)
1194
1195 command = 'sudo ip netns add ' + net_namespace
1196 self.logger.debug("command: " + command)
1197 (_, stdout, _) = self.ssh_conn.exec_command(command)
1198 content = stdout.read()
1199
1200 command = 'sudo ip link add tap-' + str(vlan) + ' type veth peer name ovs-tap-' + str(vlan)
1201 self.logger.debug("command: " + command)
1202 (_, stdout, _) = self.ssh_conn.exec_command(command)
1203 content = stdout.read()
1204
1205 command = 'sudo ovs-vsctl add-port br-int ovs-tap-' + str(vlan) + ' tag=' + str(vlan)
1206 self.logger.debug("command: " + command)
1207 (_, stdout, _) = self.ssh_conn.exec_command(command)
1208 content = stdout.read()
1209
1210 command = 'sudo ip link set tap-' + str(vlan) + ' netns ' + net_namespace
1211 self.logger.debug("command: " + command)
1212 (_, stdout, _) = self.ssh_conn.exec_command(command)
1213 content = stdout.read()
1214
1215 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + str(vlan) + ' up'
1216 self.logger.debug("command: " + command)
1217 (_, stdout, _) = self.ssh_conn.exec_command(command)
1218 content = stdout.read()
1219
1220 command = 'sudo ip link set dev ovs-tap-' + str(vlan) + ' up'
1221 self.logger.debug("command: " + command)
1222 (_, stdout, _) = self.ssh_conn.exec_command(command)
1223 content = stdout.read()
1224
1225 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev lo up'
1226 self.logger.debug("command: " + command)
1227 (_, stdout, _) = self.ssh_conn.exec_command(command)
1228 content = stdout.read()
1229
1230 command = 'sudo ip netns exec ' + net_namespace + ' ' + ' ifconfig ' + namespace_interface \
1231 + ' ' + ip_listen_address + ' netmask ' + netmask
1232 self.logger.debug("command: " + command)
1233 (_, stdout, _) = self.ssh_conn.exec_command(command)
1234 content = stdout.read()
1235
1236 if len(content) == 0:
1237 return True
1238 else:
1239 return False
1240 except paramiko.ssh_exception.SSHException as e:
1241 self.logger.error("create_dhcp_interfaces ssh Exception: " + str(e))
1242 if "SSH session not active" in str(e):
1243 self.ssh_connect()
1244 return False
1245
1246
1247 def create_ovs_vxlan_tunnel(self, vxlan_interface, remote_ip):
1248 """
1249 Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level
1250 :param vxlan_interface: vlxan inteface name.
1251 :param remote_ip: tunnel endpoint remote compute ip.
1252 :return:
1253 """
1254 if self.test:
1255 return True
1256 try:
1257 command = 'sudo ovs-vsctl add-port br-int ' + vxlan_interface + \
1258 ' -- set Interface ' + vxlan_interface + ' type=vxlan options:remote_ip=' + remote_ip + \
1259 ' -- set Port ' + vxlan_interface + ' other_config:stp-path-cost=10'
1260 self.logger.debug("command: " + command)
1261 (_, stdout, _) = self.ssh_conn.exec_command(command)
1262 content = stdout.read()
1263 # print content
1264 if len(content) == 0:
1265 return True
1266 else:
1267 return False
1268 except paramiko.ssh_exception.SSHException as e:
1269 self.logger.error("create_ovs_vxlan_tunnel ssh Exception: " + str(e))
1270 if "SSH session not active" in str(e):
1271 self.ssh_connect()
1272 return False
1273
1274 def delete_ovs_vxlan_tunnel(self, vxlan_interface):
1275 """
1276 Delete a vlxan tunnel port from a OVS brdige.
1277 :param vxlan_interface: vlxan name to be delete it.
1278 :return: True if success.
1279 """
1280 if self.test:
1281 return True
1282 try:
1283 command = 'sudo ovs-vsctl del-port br-int ' + vxlan_interface
1284 self.logger.debug("command: " + command)
1285 (_, stdout, _) = self.ssh_conn.exec_command(command)
1286 content = stdout.read()
1287 # print content
1288 if len(content) == 0:
1289 return True
1290 else:
1291 return False
1292 except paramiko.ssh_exception.SSHException as e:
1293 self.logger.error("delete_ovs_vxlan_tunnel ssh Exception: " + str(e))
1294 if "SSH session not active" in str(e):
1295 self.ssh_connect()
1296 return False
1297
1298 def delete_ovs_bridge(self):
1299 """
1300 Delete a OVS bridge from a compute.
1301 :return: True if success
1302 """
1303 if self.test:
1304 return True
1305 try:
1306 command = 'sudo ovs-vsctl del-br br-int'
1307 self.logger.debug("command: " + command)
1308 (_, stdout, _) = self.ssh_conn.exec_command(command)
1309 content = stdout.read()
1310 if len(content) == 0:
1311 return True
1312 else:
1313 return False
1314 except paramiko.ssh_exception.SSHException as e:
1315 self.logger.error("delete_ovs_bridge ssh Exception: " + str(e))
1316 if "SSH session not active" in str(e):
1317 self.ssh_connect()
1318 return False
1319
1320 def get_file_info(self, path):
1321 command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
1322 self.logger.debug("command: " + command)
1323 (_, stdout, _) = self.ssh_conn.exec_command(command)
1324 content = stdout.read()
1325 if len(content) == 0:
1326 return None # file does not exist
1327 else:
1328 return content.split(" ") # (permission, 1, owner, group, size, date, file)
1329
1330 def qemu_get_info(self, path):
1331 command = 'qemu-img info ' + path
1332 self.logger.debug("command: " + command)
1333 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
1334 content = stdout.read()
1335 if len(content) == 0:
1336 error = stderr.read()
1337 self.logger.error("get_qemu_info error " + error)
1338 raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error)
1339 else:
1340 try:
1341 return yaml.load(content)
1342 except yaml.YAMLError as exc:
1343 text = ""
1344 if hasattr(exc, 'problem_mark'):
1345 mark = exc.problem_mark
1346 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
1347 self.logger.error("get_qemu_info yaml format Exception " + text)
1348 raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text)
1349
1350 def qemu_change_backing(self, inc_file, new_backing_file):
1351 command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file
1352 self.logger.debug("command: " + command)
1353 (_, _, stderr) = self.ssh_conn.exec_command(command)
1354 content = stderr.read()
1355 if len(content) == 0:
1356 return 0
1357 else:
1358 self.logger.error("qemu_change_backing error: " + content)
1359 return -1
1360
1361 def get_notused_filename(self, proposed_name, suffix=''):
1362 '''Look for a non existing file_name in the host
1363 proposed_name: proposed file name, includes path
1364 suffix: suffix to be added to the name, before the extention
1365 '''
1366 extension = proposed_name.rfind(".")
1367 slash = proposed_name.rfind("/")
1368 if extension < 0 or extension < slash: # no extension
1369 extension = len(proposed_name)
1370 target_name = proposed_name[:extension] + suffix + proposed_name[extension:]
1371 info = self.get_file_info(target_name)
1372 if info is None:
1373 return target_name
1374
1375 index=0
1376 while info is not None:
1377 target_name = proposed_name[:extension] + suffix + "-" + str(index) + proposed_name[extension:]
1378 index+=1
1379 info = self.get_file_info(target_name)
1380 return target_name
1381
1382 def get_notused_path(self, proposed_path, suffix=''):
1383 '''Look for a non existing path at database for images
1384 proposed_path: proposed file name, includes path
1385 suffix: suffix to be added to the name, before the extention
1386 '''
1387 extension = proposed_path.rfind(".")
1388 if extension < 0:
1389 extension = len(proposed_path)
1390 if suffix != None:
1391 target_path = proposed_path[:extension] + suffix + proposed_path[extension:]
1392 index=0
1393 while True:
1394 r,_=self.db.get_table(FROM="images",WHERE={"path":target_path})
1395 if r<=0:
1396 return target_path
1397 target_path = proposed_path[:extension] + suffix + "-" + str(index) + proposed_path[extension:]
1398 index+=1
1399
1400
1401 def delete_file(self, file_name):
1402 command = 'rm -f '+file_name
1403 self.logger.debug("command: " + command)
1404 (_, _, stderr) = self.ssh_conn.exec_command(command)
1405 error_msg = stderr.read()
1406 if len(error_msg) > 0:
1407 raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg)
1408
1409 def copy_file(self, source, destination, perserve_time=True):
1410 if source[0:4]=="http":
1411 command = "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
1412 dst=destination, src=source, dst_result=destination + ".result" )
1413 else:
1414 command = 'cp --no-preserve=mode'
1415 if perserve_time:
1416 command += ' --preserve=timestamps'
1417 command += " '{}' '{}'".format(source, destination)
1418 self.logger.debug("command: " + command)
1419 (_, _, stderr) = self.ssh_conn.exec_command(command)
1420 error_msg = stderr.read()
1421 if len(error_msg) > 0:
1422 raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg)
1423
1424 def copy_remote_file(self, remote_file, use_incremental):
1425 ''' Copy a file from the repository to local folder and recursively
1426 copy the backing files in case the remote file is incremental
1427 Read and/or modified self.localinfo['files'] that contain the
1428 unmodified copies of images in the local path
1429 params:
1430 remote_file: path of remote file
1431 use_incremental: None (leave the decision to this function), True, False
1432 return:
1433 local_file: name of local file
1434 qemu_info: dict with quemu information of local file
1435 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
1436 '''
1437
1438 use_incremental_out = use_incremental
1439 new_backing_file = None
1440 local_file = None
1441 file_from_local = True
1442
1443 #in case incremental use is not decided, take the decision depending on the image
1444 #avoid the use of incremental if this image is already incremental
1445 if remote_file[0:4] == "http":
1446 file_from_local = False
1447 if file_from_local:
1448 qemu_remote_info = self.qemu_get_info(remote_file)
1449 if use_incremental_out==None:
1450 use_incremental_out = not ( file_from_local and 'backing file' in qemu_remote_info)
1451 #copy recursivelly the backing files
1452 if file_from_local and 'backing file' in qemu_remote_info:
1453 new_backing_file, _, _ = self.copy_remote_file(qemu_remote_info['backing file'], True)
1454
1455 #check if remote file is present locally
1456 if use_incremental_out and remote_file in self.localinfo['files']:
1457 local_file = self.localinfo['files'][remote_file]
1458 local_file_info = self.get_file_info(local_file)
1459 if file_from_local:
1460 remote_file_info = self.get_file_info(remote_file)
1461 if local_file_info == None:
1462 local_file = None
1463 elif file_from_local and (local_file_info[4]!=remote_file_info[4] or local_file_info[5]!=remote_file_info[5]):
1464 #local copy of file not valid because date or size are different.
1465 #TODO DELETE local file if this file is not used by any active virtual machine
1466 try:
1467 self.delete_file(local_file)
1468 del self.localinfo['files'][remote_file]
1469 except Exception:
1470 pass
1471 local_file = None
1472 else: #check that the local file has the same backing file, or there are not backing at all
1473 qemu_info = self.qemu_get_info(local_file)
1474 if new_backing_file != qemu_info.get('backing file'):
1475 local_file = None
1476
1477
1478 if local_file == None: #copy the file
1479 img_name= remote_file.split('/') [-1]
1480 img_local = self.image_path + '/' + img_name
1481 local_file = self.get_notused_filename(img_local)
1482 self.copy_file(remote_file, local_file, use_incremental_out)
1483
1484 if use_incremental_out:
1485 self.localinfo['files'][remote_file] = local_file
1486 if new_backing_file:
1487 self.qemu_change_backing(local_file, new_backing_file)
1488 qemu_info = self.qemu_get_info(local_file)
1489
1490 return local_file, qemu_info, use_incremental_out
1491
1492 def launch_server(self, conn, server, rebuild=False, domain=None):
1493 if self.test:
1494 time.sleep(random.randint(20,150)) #sleep random timeto be make it a bit more real
1495 return 0, 'Success'
1496
1497 server_id = server['uuid']
1498 paused = server.get('paused','no')
1499 try:
1500 if domain!=None and rebuild==False:
1501 domain.resume()
1502 #self.server_status[server_id] = 'ACTIVE'
1503 return 0, 'Success'
1504
1505 self.db_lock.acquire()
1506 result, server_data = self.db.get_instance(server_id)
1507 self.db_lock.release()
1508 if result <= 0:
1509 self.logger.error("launch_server ERROR getting server from DB %d %s", result, server_data)
1510 return result, server_data
1511
1512 #0: get image metadata
1513 server_metadata = server.get('metadata', {})
1514 use_incremental = None
1515
1516 if "use_incremental" in server_metadata:
1517 use_incremental = False if server_metadata["use_incremental"] == "no" else True
1518
1519 server_host_files = self.localinfo['server_files'].get( server['uuid'], {})
1520 if rebuild:
1521 #delete previous incremental files
1522 for file_ in server_host_files.values():
1523 self.delete_file(file_['source file'] )
1524 server_host_files={}
1525
1526 #1: obtain aditional devices (disks)
1527 #Put as first device the main disk
1528 devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ]
1529 if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']:
1530 devices += server_data['extended']['devices']
1531
1532 for dev in devices:
1533 if dev['image_id'] == None:
1534 continue
1535
1536 self.db_lock.acquire()
1537 result, content = self.db.get_table(FROM='images', SELECT=('path', 'metadata'),
1538 WHERE={'uuid': dev['image_id']})
1539 self.db_lock.release()
1540 if result <= 0:
1541 error_text = "ERROR", result, content, "when getting image", dev['image_id']
1542 self.logger.error("launch_server " + error_text)
1543 return -1, error_text
1544 if content[0]['metadata'] is not None:
1545 dev['metadata'] = json.loads(content[0]['metadata'])
1546 else:
1547 dev['metadata'] = {}
1548
1549 if dev['image_id'] in server_host_files:
1550 dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path
1551 dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2
1552 continue
1553
1554 #2: copy image to host
1555 remote_file = content[0]['path']
1556 use_incremental_image = use_incremental
1557 if dev['metadata'].get("use_incremental") == "no":
1558 use_incremental_image = False
1559 local_file, qemu_info, use_incremental_image = self.copy_remote_file(remote_file, use_incremental_image)
1560
1561 #create incremental image
1562 if use_incremental_image:
1563 local_file_inc = self.get_notused_filename(local_file, '.inc')
1564 command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file
1565 self.logger.debug("command: " + command)
1566 (_, _, stderr) = self.ssh_conn.exec_command(command)
1567 error_msg = stderr.read()
1568 if len(error_msg) > 0:
1569 raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg)
1570 local_file = local_file_inc
1571 qemu_info = {'file format':'qcow2'}
1572
1573 server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']}
1574
1575 dev['source file'] = local_file
1576 dev['file format'] = qemu_info['file format']
1577
1578 self.localinfo['server_files'][ server['uuid'] ] = server_host_files
1579 self.localinfo_dirty = True
1580
1581 #3 Create XML
1582 result, xml = self.create_xml_server(server_data, devices, server_metadata) #local_file
1583 if result <0:
1584 self.logger.error("create xml server error: " + xml)
1585 return -2, xml
1586 self.logger.debug("create xml: " + xml)
1587 atribute = host_thread.lvirt_module.VIR_DOMAIN_START_PAUSED if paused == "yes" else 0
1588 #4 Start the domain
1589 if not rebuild: #ensures that any pending destroying server is done
1590 self.server_forceoff(True)
1591 #self.logger.debug("launching instance " + xml)
1592 conn.createXML(xml, atribute)
1593 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
1594
1595 return 0, 'Success'
1596
1597 except paramiko.ssh_exception.SSHException as e:
1598 text = e.args[0]
1599 self.logger.error("launch_server id='%s' ssh Exception: %s", server_id, text)
1600 if "SSH session not active" in text:
1601 self.ssh_connect()
1602 except host_thread.lvirt_module.libvirtError as e:
1603 text = e.get_error_message()
1604 self.logger.error("launch_server id='%s' libvirt Exception: %s", server_id, text)
1605 except Exception as e:
1606 text = str(e)
1607 self.logger.error("launch_server id='%s' Exception: %s", server_id, text)
1608 return -1, text
1609
1610 def update_servers_status(self):
1611 # # virDomainState
1612 # VIR_DOMAIN_NOSTATE = 0
1613 # VIR_DOMAIN_RUNNING = 1
1614 # VIR_DOMAIN_BLOCKED = 2
1615 # VIR_DOMAIN_PAUSED = 3
1616 # VIR_DOMAIN_SHUTDOWN = 4
1617 # VIR_DOMAIN_SHUTOFF = 5
1618 # VIR_DOMAIN_CRASHED = 6
1619 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
1620
1621 if self.test or len(self.server_status)==0:
1622 return
1623
1624 try:
1625 conn = host_thread.lvirt_module.open(self.lvirt_conn_uri)
1626 domains= conn.listAllDomains()
1627 domain_dict={}
1628 for domain in domains:
1629 uuid = domain.UUIDString() ;
1630 libvirt_status = domain.state()
1631 #print libvirt_status
1632 if libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_RUNNING or libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTDOWN:
1633 new_status = "ACTIVE"
1634 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_PAUSED:
1635 new_status = "PAUSED"
1636 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTOFF:
1637 new_status = "INACTIVE"
1638 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_CRASHED:
1639 new_status = "ERROR"
1640 else:
1641 new_status = None
1642 domain_dict[uuid] = new_status
1643 conn.close()
1644 except host_thread.lvirt_module.libvirtError as e:
1645 self.logger.error("get_state() Exception " + e.get_error_message())
1646 return
1647
1648 for server_id, current_status in self.server_status.iteritems():
1649 new_status = None
1650 if server_id in domain_dict:
1651 new_status = domain_dict[server_id]
1652 else:
1653 new_status = "INACTIVE"
1654
1655 if new_status == None or new_status == current_status:
1656 continue
1657 if new_status == 'INACTIVE' and current_status == 'ERROR':
1658 continue #keep ERROR status, because obviously this machine is not running
1659 #change status
1660 self.logger.debug("server id='%s' status change from '%s' to '%s'", server_id, current_status, new_status)
1661 STATUS={'progress':100, 'status':new_status}
1662 if new_status == 'ERROR':
1663 STATUS['last_error'] = 'machine has crashed'
1664 self.db_lock.acquire()
1665 r,_ = self.db.update_rows('instances', STATUS, {'uuid':server_id}, log=False)
1666 self.db_lock.release()
1667 if r>=0:
1668 self.server_status[server_id] = new_status
1669
1670 def action_on_server(self, req, last_retry=True):
1671 '''Perform an action on a req
1672 Attributes:
1673 req: dictionary that contain:
1674 server properties: 'uuid','name','tenant_id','status'
1675 action: 'action'
1676 host properties: 'user', 'ip_name'
1677 return (error, text)
1678 0: No error. VM is updated to new state,
1679 -1: Invalid action, as trying to pause a PAUSED VM
1680 -2: Error accessing host
1681 -3: VM nor present
1682 -4: Error at DB access
1683 -5: Error while trying to perform action. VM is updated to ERROR
1684 '''
1685 server_id = req['uuid']
1686 conn = None
1687 new_status = None
1688 old_status = req['status']
1689 last_error = None
1690
1691 if self.test:
1692 if 'terminate' in req['action']:
1693 new_status = 'deleted'
1694 elif 'shutoff' in req['action'] or 'shutdown' in req['action'] or 'forceOff' in req['action']:
1695 if req['status']!='ERROR':
1696 time.sleep(5)
1697 new_status = 'INACTIVE'
1698 elif 'start' in req['action'] and req['status']!='ERROR':
1699 new_status = 'ACTIVE'
1700 elif 'resume' in req['action'] and req['status']!='ERROR' and req['status']!='INACTIVE':
1701 new_status = 'ACTIVE'
1702 elif 'pause' in req['action'] and req['status']!='ERROR':
1703 new_status = 'PAUSED'
1704 elif 'reboot' in req['action'] and req['status']!='ERROR':
1705 new_status = 'ACTIVE'
1706 elif 'rebuild' in req['action']:
1707 time.sleep(random.randint(20,150))
1708 new_status = 'ACTIVE'
1709 elif 'createImage' in req['action']:
1710 time.sleep(5)
1711 self.create_image(None, req)
1712 else:
1713 try:
1714 conn = host_thread.lvirt_module.open(self.lvirt_conn_uri)
1715 try:
1716 dom = conn.lookupByUUIDString(server_id)
1717 except host_thread.lvirt_module.libvirtError as e:
1718 text = e.get_error_message()
1719 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1720 dom = None
1721 else:
1722 self.logger.error("action_on_server id='%s' libvirt exception: %s", server_id, text)
1723 raise e
1724
1725 if 'forceOff' in req['action']:
1726 if dom == None:
1727 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1728 else:
1729 try:
1730 self.logger.debug("sending DESTROY to server id='%s'", server_id)
1731 dom.destroy()
1732 except Exception as e:
1733 if "domain is not running" not in e.get_error_message():
1734 self.logger.error("action_on_server id='%s' Exception while sending force off: %s",
1735 server_id, e.get_error_message())
1736 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1737 new_status = 'ERROR'
1738
1739 elif 'terminate' in req['action']:
1740 if dom == None:
1741 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1742 new_status = 'deleted'
1743 else:
1744 try:
1745 if req['action']['terminate'] == 'force':
1746 self.logger.debug("sending DESTROY to server id='%s'", server_id)
1747 dom.destroy()
1748 new_status = 'deleted'
1749 else:
1750 self.logger.debug("sending SHUTDOWN to server id='%s'", server_id)
1751 dom.shutdown()
1752 self.pending_terminate_server.append( (time.time()+10,server_id) )
1753 except Exception as e:
1754 self.logger.error("action_on_server id='%s' Exception while destroy: %s",
1755 server_id, e.get_error_message())
1756 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1757 new_status = 'ERROR'
1758 if "domain is not running" in e.get_error_message():
1759 try:
1760 dom.undefine()
1761 new_status = 'deleted'
1762 except Exception:
1763 self.logger.error("action_on_server id='%s' Exception while undefine: %s",
1764 server_id, e.get_error_message())
1765 last_error = 'action_on_server Exception2 while undefine:', e.get_error_message()
1766 #Exception: 'virDomainDetachDevice() failed'
1767 if new_status=='deleted':
1768 if server_id in self.server_status:
1769 del self.server_status[server_id]
1770 if req['uuid'] in self.localinfo['server_files']:
1771 for file_ in self.localinfo['server_files'][ req['uuid'] ].values():
1772 try:
1773 self.delete_file(file_['source file'])
1774 except Exception:
1775 pass
1776 del self.localinfo['server_files'][ req['uuid'] ]
1777 self.localinfo_dirty = True
1778
1779 elif 'shutoff' in req['action'] or 'shutdown' in req['action']:
1780 try:
1781 if dom == None:
1782 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1783 else:
1784 dom.shutdown()
1785 # new_status = 'INACTIVE'
1786 #TODO: check status for changing at database
1787 except Exception as e:
1788 new_status = 'ERROR'
1789 self.logger.error("action_on_server id='%s' Exception while shutdown: %s",
1790 server_id, e.get_error_message())
1791 last_error = 'action_on_server Exception while shutdown: ' + e.get_error_message()
1792
1793 elif 'rebuild' in req['action']:
1794 if dom != None:
1795 dom.destroy()
1796 r = self.launch_server(conn, req, True, None)
1797 if r[0] <0:
1798 new_status = 'ERROR'
1799 last_error = r[1]
1800 else:
1801 new_status = 'ACTIVE'
1802 elif 'start' in req['action']:
1803 # The instance is only create in DB but not yet at libvirt domain, needs to be create
1804 rebuild = True if req['action']['start'] == 'rebuild' else False
1805 r = self.launch_server(conn, req, rebuild, dom)
1806 if r[0] <0:
1807 new_status = 'ERROR'
1808 last_error = r[1]
1809 else:
1810 new_status = 'ACTIVE'
1811
1812 elif 'resume' in req['action']:
1813 try:
1814 if dom == None:
1815 pass
1816 else:
1817 dom.resume()
1818 # new_status = 'ACTIVE'
1819 except Exception as e:
1820 self.logger.error("action_on_server id='%s' Exception while resume: %s",
1821 server_id, e.get_error_message())
1822
1823 elif 'pause' in req['action']:
1824 try:
1825 if dom == None:
1826 pass
1827 else:
1828 dom.suspend()
1829 # new_status = 'PAUSED'
1830 except Exception as e:
1831 self.logger.error("action_on_server id='%s' Exception while pause: %s",
1832 server_id, e.get_error_message())
1833
1834 elif 'reboot' in req['action']:
1835 try:
1836 if dom == None:
1837 pass
1838 else:
1839 dom.reboot()
1840 self.logger.debug("action_on_server id='%s' reboot:", server_id)
1841 #new_status = 'ACTIVE'
1842 except Exception as e:
1843 self.logger.error("action_on_server id='%s' Exception while reboot: %s",
1844 server_id, e.get_error_message())
1845 elif 'createImage' in req['action']:
1846 self.create_image(dom, req)
1847
1848
1849 conn.close()
1850 except host_thread.lvirt_module.libvirtError as e:
1851 if conn is not None: conn.close()
1852 text = e.get_error_message()
1853 new_status = "ERROR"
1854 last_error = text
1855 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1856 self.logger.debug("action_on_server id='%s' Exception removed from host", server_id)
1857 else:
1858 self.logger.error("action_on_server id='%s' Exception %s", server_id, text)
1859 #end of if self.test
1860 if new_status == None:
1861 return 1
1862
1863 self.logger.debug("action_on_server id='%s' new status=%s %s",server_id, new_status, last_error)
1864 UPDATE = {'progress':100, 'status':new_status}
1865
1866 if new_status=='ERROR':
1867 if not last_retry: #if there will be another retry do not update database
1868 return -1
1869 elif 'terminate' in req['action']:
1870 #PUT a log in the database
1871 self.logger.error("PANIC deleting server id='%s' %s", server_id, last_error)
1872 self.db_lock.acquire()
1873 self.db.new_row('logs',
1874 {'uuid':server_id, 'tenant_id':req['tenant_id'], 'related':'instances','level':'panic',
1875 'description':'PANIC deleting server from host '+self.name+': '+last_error}
1876 )
1877 self.db_lock.release()
1878 if server_id in self.server_status:
1879 del self.server_status[server_id]
1880 return -1
1881 else:
1882 UPDATE['last_error'] = last_error
1883 if new_status != 'deleted' and (new_status != old_status or new_status == 'ERROR') :
1884 self.db_lock.acquire()
1885 self.db.update_rows('instances', UPDATE, {'uuid':server_id}, log=True)
1886 self.server_status[server_id] = new_status
1887 self.db_lock.release()
1888 if new_status == 'ERROR':
1889 return -1
1890 return 1
1891
1892
1893 def restore_iface(self, name, mac, lib_conn=None):
1894 ''' make an ifdown, ifup to restore default parameter of na interface
1895 Params:
1896 mac: mac address of the interface
1897 lib_conn: connection to the libvirt, if None a new connection is created
1898 Return 0,None if ok, -1,text if fails
1899 '''
1900 conn=None
1901 ret = 0
1902 error_text=None
1903 if self.test:
1904 self.logger.debug("restore_iface '%s' %s", name, mac)
1905 return 0, None
1906 try:
1907 if not lib_conn:
1908 conn = host_thread.lvirt_module.open(self.lvirt_conn_uri)
1909 else:
1910 conn = lib_conn
1911
1912 #wait to the pending VM deletion
1913 #TODO.Revise self.server_forceoff(True)
1914
1915 iface = conn.interfaceLookupByMACString(mac)
1916 if iface.isActive():
1917 iface.destroy()
1918 iface.create()
1919 self.logger.debug("restore_iface '%s' %s", name, mac)
1920 except host_thread.lvirt_module.libvirtError as e:
1921 error_text = e.get_error_message()
1922 self.logger.error("restore_iface '%s' '%s' libvirt exception: %s", name, mac, error_text)
1923 ret=-1
1924 finally:
1925 if lib_conn is None and conn is not None:
1926 conn.close()
1927 return ret, error_text
1928
1929
1930 def create_image(self,dom, req):
1931 if self.test:
1932 if 'path' in req['action']['createImage']:
1933 file_dst = req['action']['createImage']['path']
1934 else:
1935 createImage=req['action']['createImage']
1936 img_name= createImage['source']['path']
1937 index=img_name.rfind('/')
1938 file_dst = self.get_notused_path(img_name[:index+1] + createImage['name'] + '.qcow2')
1939 image_status='ACTIVE'
1940 else:
1941 for retry in (0,1):
1942 try:
1943 server_id = req['uuid']
1944 createImage=req['action']['createImage']
1945 file_orig = self.localinfo['server_files'][server_id] [ createImage['source']['image_id'] ] ['source file']
1946 if 'path' in req['action']['createImage']:
1947 file_dst = req['action']['createImage']['path']
1948 else:
1949 img_name= createImage['source']['path']
1950 index=img_name.rfind('/')
1951 file_dst = self.get_notused_filename(img_name[:index+1] + createImage['name'] + '.qcow2')
1952
1953 self.copy_file(file_orig, file_dst)
1954 qemu_info = self.qemu_get_info(file_orig)
1955 if 'backing file' in qemu_info:
1956 for k,v in self.localinfo['files'].items():
1957 if v==qemu_info['backing file']:
1958 self.qemu_change_backing(file_dst, k)
1959 break
1960 image_status='ACTIVE'
1961 break
1962 except paramiko.ssh_exception.SSHException as e:
1963 image_status='ERROR'
1964 error_text = e.args[0]
1965 self.logger.error("create_image id='%s' ssh Exception: %s", server_id, error_text)
1966 if "SSH session not active" in error_text and retry==0:
1967 self.ssh_connect()
1968 except Exception as e:
1969 image_status='ERROR'
1970 error_text = str(e)
1971 self.logger.error("create_image id='%s' Exception: %s", server_id, error_text)
1972
1973 #TODO insert a last_error at database
1974 self.db_lock.acquire()
1975 self.db.update_rows('images', {'status':image_status, 'progress': 100, 'path':file_dst},
1976 {'uuid':req['new_image']['uuid']}, log=True)
1977 self.db_lock.release()
1978
1979 def edit_iface(self, port_id, old_net, new_net):
1980 #This action imply remove and insert interface to put proper parameters
1981 if self.test:
1982 time.sleep(1)
1983 else:
1984 #get iface details
1985 self.db_lock.acquire()
1986 r,c = self.db.get_table(FROM='ports as p join resources_port as rp on p.uuid=rp.port_id',
1987 WHERE={'port_id': port_id})
1988 self.db_lock.release()
1989 if r<0:
1990 self.logger.error("edit_iface %s DDBB error: %s", port_id, c)
1991 return
1992 elif r==0:
1993 self.logger.error("edit_iface %s port not found", port_id)
1994 return
1995 port=c[0]
1996 if port["model"]!="VF":
1997 self.logger.error("edit_iface %s ERROR model must be VF", port_id)
1998 return
1999 #create xml detach file
2000 xml=[]
2001 self.xml_level = 2
2002 xml.append("<interface type='hostdev' managed='yes'>")
2003 xml.append(" <mac address='" +port['mac']+ "'/>")
2004 xml.append(" <source>"+ self.pci2xml(port['pci'])+"\n </source>")
2005 xml.append('</interface>')
2006
2007
2008 try:
2009 conn=None
2010 conn = host_thread.lvirt_module.open(self.lvirt_conn_uri)
2011 dom = conn.lookupByUUIDString(port["instance_id"])
2012 if old_net:
2013 text="\n".join(xml)
2014 self.logger.debug("edit_iface detaching SRIOV interface " + text)
2015 dom.detachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
2016 if new_net:
2017 xml[-1] =" <vlan> <tag id='" + str(port['vlan']) + "'/> </vlan>"
2018 self.xml_level = 1
2019 xml.append(self.pci2xml(port.get('vpci',None)) )
2020 xml.append('</interface>')
2021 text="\n".join(xml)
2022 self.logger.debug("edit_iface attaching SRIOV interface " + text)
2023 dom.attachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
2024
2025 except host_thread.lvirt_module.libvirtError as e:
2026 text = e.get_error_message()
2027 self.logger.error("edit_iface %s libvirt exception: %s", port["instance_id"], text)
2028
2029 finally:
2030 if conn is not None: conn.close()
2031
2032
2033 def create_server(server, db, db_lock, only_of_ports):
2034 extended = server.get('extended', None)
2035 requirements={}
2036 requirements['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
2037 requirements['ram'] = server['flavor'].get('ram', 0)
2038 if requirements['ram']== None:
2039 requirements['ram'] = 0
2040 requirements['vcpus'] = server['flavor'].get('vcpus', 0)
2041 if requirements['vcpus']== None:
2042 requirements['vcpus'] = 0
2043 #If extended is not defined get requirements from flavor
2044 if extended is None:
2045 #If extended is defined in flavor convert to dictionary and use it
2046 if 'extended' in server['flavor'] and server['flavor']['extended'] != None:
2047 json_acceptable_string = server['flavor']['extended'].replace("'", "\"")
2048 extended = json.loads(json_acceptable_string)
2049 else:
2050 extended = None
2051 #print json.dumps(extended, indent=4)
2052
2053 #For simplicity only one numa VM are supported in the initial implementation
2054 if extended != None:
2055 numas = extended.get('numas', [])
2056 if len(numas)>1:
2057 return (-2, "Multi-NUMA VMs are not supported yet")
2058 #elif len(numas)<1:
2059 # return (-1, "At least one numa must be specified")
2060
2061 #a for loop is used in order to be ready to multi-NUMA VMs
2062 request = []
2063 for numa in numas:
2064 numa_req = {}
2065 numa_req['memory'] = numa.get('memory', 0)
2066 if 'cores' in numa:
2067 numa_req['proc_req_nb'] = numa['cores'] #number of cores or threads to be reserved
2068 numa_req['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
2069 numa_req['proc_req_list'] = numa.get('cores-id', None) #list of ids to be assigned to the cores or threads
2070 elif 'paired-threads' in numa:
2071 numa_req['proc_req_nb'] = numa['paired-threads']
2072 numa_req['proc_req_type'] = 'paired-threads'
2073 numa_req['proc_req_list'] = numa.get('paired-threads-id', None)
2074 elif 'threads' in numa:
2075 numa_req['proc_req_nb'] = numa['threads']
2076 numa_req['proc_req_type'] = 'threads'
2077 numa_req['proc_req_list'] = numa.get('threads-id', None)
2078 else:
2079 numa_req['proc_req_nb'] = 0 # by default
2080 numa_req['proc_req_type'] = 'threads'
2081
2082
2083
2084 #Generate a list of sriov and another for physical interfaces
2085 interfaces = numa.get('interfaces', [])
2086 sriov_list = []
2087 port_list = []
2088 for iface in interfaces:
2089 iface['bandwidth'] = int(iface['bandwidth'])
2090 if iface['dedicated'][:3]=='yes':
2091 port_list.append(iface)
2092 else:
2093 sriov_list.append(iface)
2094
2095 #Save lists ordered from more restrictive to less bw requirements
2096 numa_req['sriov_list'] = sorted(sriov_list, key=lambda k: k['bandwidth'], reverse=True)
2097 numa_req['port_list'] = sorted(port_list, key=lambda k: k['bandwidth'], reverse=True)
2098
2099
2100 request.append(numa_req)
2101
2102 # print "----------\n"+json.dumps(request[0], indent=4)
2103 # print '----------\n\n'
2104
2105 #Search in db for an appropriate numa for each requested numa
2106 #at the moment multi-NUMA VMs are not supported
2107 if len(request)>0:
2108 requirements['numa'].update(request[0])
2109 if requirements['numa']['memory']>0:
2110 requirements['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
2111 elif requirements['ram']==0:
2112 return (-1, "Memory information not set neither at extended field not at ram")
2113 if requirements['numa']['proc_req_nb']>0:
2114 requirements['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
2115 elif requirements['vcpus']==0:
2116 return (-1, "Processor information not set neither at extended field not at vcpus")
2117
2118
2119 db_lock.acquire()
2120 result, content = db.get_numas(requirements, server.get('host_id', None), only_of_ports)
2121 db_lock.release()
2122
2123 if result == -1:
2124 return (-1, content)
2125
2126 numa_id = content['numa_id']
2127 host_id = content['host_id']
2128
2129 #obtain threads_id and calculate pinning
2130 cpu_pinning = []
2131 reserved_threads=[]
2132 if requirements['numa']['proc_req_nb']>0:
2133 db_lock.acquire()
2134 result, content = db.get_table(FROM='resources_core',
2135 SELECT=('id','core_id','thread_id'),
2136 WHERE={'numa_id':numa_id,'instance_id': None, 'status':'ok'} )
2137 db_lock.release()
2138 if result <= 0:
2139 #print content
2140 return -1, content
2141
2142 #convert rows to a dictionary indexed by core_id
2143 cores_dict = {}
2144 for row in content:
2145 if not row['core_id'] in cores_dict:
2146 cores_dict[row['core_id']] = []
2147 cores_dict[row['core_id']].append([row['thread_id'],row['id']])
2148
2149 #In case full cores are requested
2150 paired = 'N'
2151 if requirements['numa']['proc_req_type'] == 'cores':
2152 #Get/create the list of the vcpu_ids
2153 vcpu_id_list = requirements['numa']['proc_req_list']
2154 if vcpu_id_list == None:
2155 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2156
2157 for threads in cores_dict.itervalues():
2158 #we need full cores
2159 if len(threads) != 2:
2160 continue
2161
2162 #set pinning for the first thread
2163 cpu_pinning.append( [ vcpu_id_list.pop(0), threads[0][0], threads[0][1] ] )
2164
2165 #reserve so it is not used the second thread
2166 reserved_threads.append(threads[1][1])
2167
2168 if len(vcpu_id_list) == 0:
2169 break
2170
2171 #In case paired threads are requested
2172 elif requirements['numa']['proc_req_type'] == 'paired-threads':
2173 paired = 'Y'
2174 #Get/create the list of the vcpu_ids
2175 if requirements['numa']['proc_req_list'] != None:
2176 vcpu_id_list = []
2177 for pair in requirements['numa']['proc_req_list']:
2178 if len(pair)!=2:
2179 return -1, "Field paired-threads-id not properly specified"
2180 return
2181 vcpu_id_list.append(pair[0])
2182 vcpu_id_list.append(pair[1])
2183 else:
2184 vcpu_id_list = range(0,2*int(requirements['numa']['proc_req_nb']))
2185
2186 for threads in cores_dict.itervalues():
2187 #we need full cores
2188 if len(threads) != 2:
2189 continue
2190 #set pinning for the first thread
2191 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2192
2193 #set pinning for the second thread
2194 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2195
2196 if len(vcpu_id_list) == 0:
2197 break
2198
2199 #In case normal threads are requested
2200 elif requirements['numa']['proc_req_type'] == 'threads':
2201 #Get/create the list of the vcpu_ids
2202 vcpu_id_list = requirements['numa']['proc_req_list']
2203 if vcpu_id_list == None:
2204 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2205
2206 for threads_index in sorted(cores_dict, key=lambda k: len(cores_dict[k])):
2207 threads = cores_dict[threads_index]
2208 #set pinning for the first thread
2209 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2210
2211 #if exists, set pinning for the second thread
2212 if len(threads) == 2 and len(vcpu_id_list) != 0:
2213 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2214
2215 if len(vcpu_id_list) == 0:
2216 break
2217
2218 #Get the source pci addresses for the selected numa
2219 used_sriov_ports = []
2220 for port in requirements['numa']['sriov_list']:
2221 db_lock.acquire()
2222 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2223 db_lock.release()
2224 if result <= 0:
2225 #print content
2226 return -1, content
2227 for row in content:
2228 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2229 continue
2230 port['pci'] = row['pci']
2231 if 'mac_address' not in port:
2232 port['mac_address'] = row['mac']
2233 del port['mac']
2234 port['port_id']=row['id']
2235 port['Mbps_used'] = port['bandwidth']
2236 used_sriov_ports.append(row['id'])
2237 break
2238
2239 for port in requirements['numa']['port_list']:
2240 port['Mbps_used'] = None
2241 if port['dedicated'] != "yes:sriov":
2242 port['mac_address'] = port['mac']
2243 del port['mac']
2244 continue
2245 db_lock.acquire()
2246 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac', 'Mbps'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2247 db_lock.release()
2248 if result <= 0:
2249 #print content
2250 return -1, content
2251 port['Mbps_used'] = content[0]['Mbps']
2252 for row in content:
2253 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2254 continue
2255 port['pci'] = row['pci']
2256 if 'mac_address' not in port:
2257 port['mac_address'] = row['mac'] # mac cannot be set to passthrough ports
2258 del port['mac']
2259 port['port_id']=row['id']
2260 used_sriov_ports.append(row['id'])
2261 break
2262
2263 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
2264 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
2265
2266 server['host_id'] = host_id
2267
2268 #Generate dictionary for saving in db the instance resources
2269 resources = {}
2270 resources['bridged-ifaces'] = []
2271
2272 numa_dict = {}
2273 numa_dict['interfaces'] = []
2274
2275 numa_dict['interfaces'] += requirements['numa']['port_list']
2276 numa_dict['interfaces'] += requirements['numa']['sriov_list']
2277
2278 #Check bridge information
2279 unified_dataplane_iface=[]
2280 unified_dataplane_iface += requirements['numa']['port_list']
2281 unified_dataplane_iface += requirements['numa']['sriov_list']
2282
2283 for control_iface in server.get('networks', []):
2284 control_iface['net_id']=control_iface.pop('uuid')
2285 #Get the brifge name
2286 db_lock.acquire()
2287 result, content = db.get_table(FROM='nets',
2288 SELECT=('name', 'type', 'vlan', 'provider', 'enable_dhcp',
2289 'dhcp_first_ip', 'dhcp_last_ip', 'cidr'),
2290 WHERE={'uuid': control_iface['net_id']})
2291 db_lock.release()
2292 if result < 0:
2293 pass
2294 elif result==0:
2295 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface['net_id']
2296 else:
2297 network=content[0]
2298 if control_iface.get("type", 'virtual') == 'virtual':
2299 if network['type']!='bridge_data' and network['type']!='bridge_man':
2300 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface['net_id']
2301 resources['bridged-ifaces'].append(control_iface)
2302 if network.get("provider") and network["provider"][0:3] == "OVS":
2303 control_iface["type"] = "instance:ovs"
2304 else:
2305 control_iface["type"] = "instance:bridge"
2306 if network.get("vlan"):
2307 control_iface["vlan"] = network["vlan"]
2308
2309 if network.get("enable_dhcp") == 'true':
2310 control_iface["enable_dhcp"] = network.get("enable_dhcp")
2311 control_iface["dhcp_first_ip"] = network["dhcp_first_ip"]
2312 control_iface["dhcp_last_ip"] = network["dhcp_last_ip"]
2313 control_iface["cidr"] = network["cidr"]
2314 else:
2315 if network['type']!='data' and network['type']!='ptp':
2316 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface['net_id']
2317 #dataplane interface, look for it in the numa tree and asign this network
2318 iface_found=False
2319 for dataplane_iface in numa_dict['interfaces']:
2320 if dataplane_iface['name'] == control_iface.get("name"):
2321 if (dataplane_iface['dedicated'] == "yes" and control_iface["type"] != "PF") or \
2322 (dataplane_iface['dedicated'] == "no" and control_iface["type"] != "VF") or \
2323 (dataplane_iface['dedicated'] == "yes:sriov" and control_iface["type"] != "VFnotShared") :
2324 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
2325 (control_iface.get("name"), dataplane_iface['dedicated'], control_iface["type"])
2326 dataplane_iface['uuid'] = control_iface['net_id']
2327 if dataplane_iface['dedicated'] == "no":
2328 dataplane_iface['vlan'] = network['vlan']
2329 if dataplane_iface['dedicated'] != "yes" and control_iface.get("mac_address"):
2330 dataplane_iface['mac_address'] = control_iface.get("mac_address")
2331 if control_iface.get("vpci"):
2332 dataplane_iface['vpci'] = control_iface.get("vpci")
2333 iface_found=True
2334 break
2335 if not iface_found:
2336 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface.get("name")
2337
2338 resources['host_id'] = host_id
2339 resources['image_id'] = server['image_id']
2340 resources['flavor_id'] = server['flavor_id']
2341 resources['tenant_id'] = server['tenant_id']
2342 resources['ram'] = requirements['ram']
2343 resources['vcpus'] = requirements['vcpus']
2344 resources['status'] = 'CREATING'
2345
2346 if 'description' in server: resources['description'] = server['description']
2347 if 'name' in server: resources['name'] = server['name']
2348
2349 resources['extended'] = {} #optional
2350 resources['extended']['numas'] = []
2351 numa_dict['numa_id'] = numa_id
2352 numa_dict['memory'] = requirements['numa']['memory']
2353 numa_dict['cores'] = []
2354
2355 for core in cpu_pinning:
2356 numa_dict['cores'].append({'id': core[2], 'vthread': core[0], 'paired': paired})
2357 for core in reserved_threads:
2358 numa_dict['cores'].append({'id': core})
2359 resources['extended']['numas'].append(numa_dict)
2360 if extended!=None and 'devices' in extended: #TODO allow extra devices without numa
2361 resources['extended']['devices'] = extended['devices']
2362
2363
2364 # '===================================={'
2365 #print json.dumps(resources, indent=4)
2366 #print '====================================}'
2367
2368 return 0, resources
2369