Add openflow-port-mapping CLI command
[osm/openvim.git] / dhcp_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the dhcp server to get the IP addresses
26 '''
27 __author__="Pablo Montes, Alfonso Tierno"
28 __date__ ="$4-Jan-2016 12:07:15$"
29
30
31
32 import threading
33 import time
34 import Queue
35 import paramiko
36 import random
37 import subprocess
38
39 #TODO: insert a logging system
40
41 class dhcp_thread(threading.Thread):
42 def __init__(self, dhcp_params, db, db_lock, test, dhcp_nets, debug=None):
43 '''Init a thread.
44 Arguments: thread_info must be a dictionary with:
45 'dhcp_params' dhcp server parameters with the following keys:
46 mandatory : user, host, port, key, ifaces(interface name list of the one managed by the dhcp)
47 optional: password, key, port(22)
48 'db' 'db_lock': database class and lock for accessing it
49 'test': in test mode no acces to a server is done, and ip is invented
50 '''
51 threading.Thread.__init__(self)
52 self.name = "dhcp_thread"
53 self.dhcp_params = dhcp_params
54 self.debug = debug
55 self.db = db
56 self.db_lock = db_lock
57 self.test = test
58 self.dhcp_nets = dhcp_nets
59 self.ssh_conn = None
60
61 self.mac_status ={} #dictionary of mac_address to retrieve information
62 #ip: None
63 #retries:
64 #next_reading: time for the next trying to check ACTIVE status or IP
65 #created: time when it was added
66 #active: time when the VM becomes into ACTIVE status
67
68
69 self.queueLock = threading.Lock()
70 self.taskQueue = Queue.Queue(2000)
71
72 def ssh_connect(self):
73 try:
74 #Connect SSH
75 self.ssh_conn = paramiko.SSHClient()
76 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
77 self.ssh_conn.load_system_host_keys()
78 self.ssh_conn.connect(self.dhcp_params["host"], port=self.dhcp_params.get("port",22),
79 username=self.dhcp_params["user"], password=self.dhcp_params.get("password"), key_filename=self.dhcp_params.get("key"),
80 timeout=2)
81 except paramiko.ssh_exception.SSHException as e:
82 text = e.args[0]
83 print self.name, ": ssh_connect ssh Exception:", text
84
85 def load_mac_from_db(self):
86 #TODO get macs to follow from the database
87 print self.name, " load macs from db"
88 self.db_lock.acquire()
89 r,c = self.db.get_table(SELECT=('mac','ip_address','nets.uuid as net_id', ),
90 FROM='ports join nets on ports.net_id=nets.uuid',
91 WHERE_NOT={'ports.instance_id': None, 'nets.provider': None})
92 self.db_lock.release()
93 now = time.time()
94 self.mac_status ={}
95 if r<0:
96 print self.name, ": Error getting data from database:", c
97 return
98 for port in c:
99 if port["net_id"] in self.dhcp_nets:
100 self.mac_status[ port["mac"] ] = {"ip": port["ip_address"], "next_reading": now, "created": now, "retries":0}
101
102 def insert_task(self, task, *aditional):
103 try:
104 self.queueLock.acquire()
105 task = self.taskQueue.put( (task,) + aditional, timeout=5)
106 self.queueLock.release()
107 return 1, None
108 except Queue.Full:
109 return -1, "timeout inserting a task over host " + self.name
110
111 def run(self):
112 print self.name, " starting, nets", self.dhcp_nets
113 next_iteration = time.time() + 10
114 while True:
115 self.load_mac_from_db()
116 while True:
117 self.queueLock.acquire()
118 if not self.taskQueue.empty():
119 task = self.taskQueue.get()
120 else:
121 task = None
122 self.queueLock.release()
123
124 if task is None:
125 now=time.time()
126 if now >= next_iteration:
127 next_iteration = self.get_ip_from_dhcp()
128 else:
129 time.sleep(1)
130 continue
131
132 if task[0] == 'add':
133 print self.name, ": processing task add mac", task[1]
134 now=time.time()
135 self.mac_status[task[1] ] = {"ip": None, "next_reading": now, "created": now, "retries":0}
136 next_iteration = now
137 elif task[0] == 'del':
138 print self.name, ": processing task del mac", task[1]
139 if task[1] in self.mac_status:
140 del self.mac_status[task[1] ]
141 elif task[0] == 'exit':
142 print self.name, ": processing task exit"
143 self.terminate()
144 return 0
145 else:
146 print self.name, ": unknown task", task
147
148 def terminate(self):
149 try:
150 if self.ssh_conn:
151 self.ssh_conn.close()
152 except Exception as e:
153 text = str(e)
154 print self.name, ": terminate Exception:", text
155 print self.name, ": exit from host_thread"
156
157 def get_ip_from_dhcp(self):
158
159 now = time.time()
160 next_iteration= now + 40000 # >10 hores
161
162 #print self.name, "Iteration"
163 for mac_address in self.mac_status:
164 if now < self.mac_status[mac_address]["next_reading"]:
165 if self.mac_status[mac_address]["next_reading"] < next_iteration:
166 next_iteration = self.mac_status[mac_address]["next_reading"]
167 continue
168
169 if self.mac_status[mac_address].get("active") == None:
170 #check from db if already active
171 self.db_lock.acquire()
172 r,c = self.db.get_table(FROM="ports as p join instances as i on p.instance_id=i.uuid",
173 WHERE={"p.mac": mac_address, "i.status": "ACTIVE"})
174 self.db_lock.release()
175 if r>0:
176 self.mac_status[mac_address]["active"] = now
177 self.mac_status[mac_address]["next_reading"] = (int(now)/2 +1)* 2
178 print self.name, "mac %s VM ACTIVE" % (mac_address)
179 self.mac_status[mac_address]["retries"] = 0
180 else:
181 #print self.name, "mac %s VM INACTIVE" % (mac_address)
182 if now - self.mac_status[mac_address]["created"] > 300:
183 #modify Database to tell openmano that we can not get dhcp from the machine
184 if not self.mac_status[mac_address].get("ip"):
185 self.db_lock.acquire()
186 r,c = self.db.update_rows("ports", {"ip_address": "0.0.0.0"}, {"mac": mac_address})
187 self.db_lock.release()
188 self.mac_status[mac_address]["ip"] = "0.0.0.0"
189 print self.name, "mac %s >> set to 0.0.0.0 because of timeout" % (mac_address)
190 self.mac_status[mac_address]["next_reading"] = (int(now)/60 +1)* 60
191 else:
192 self.mac_status[mac_address]["next_reading"] = (int(now)/6 +1)* 6
193 if self.mac_status[mac_address]["next_reading"] < next_iteration:
194 next_iteration = self.mac_status[mac_address]["next_reading"]
195 continue
196
197
198 if self.test:
199 if self.mac_status[mac_address]["retries"]>random.randint(10,100): #wait between 10 and 100 seconds to produce a fake IP
200 content = self.get_fake_ip()
201 else:
202 content = None
203 elif self.dhcp_params["host"]=="localhost":
204 try:
205 command = ['get_dhcp_lease.sh', mac_address]
206 content = subprocess.check_output(command)
207 except Exception as e:
208 text = str(e)
209 print self.name, ": get_ip_from_dhcp subprocess Exception", text
210 content = None
211 else:
212 try:
213 if not self.ssh_conn:
214 self.ssh_connect()
215 command = 'get_dhcp_lease.sh ' + mac_address
216 (_, stdout, _) = self.ssh_conn.exec_command(command)
217 content = stdout.read()
218 except paramiko.ssh_exception.SSHException as e:
219 text = e.args[0]
220 print self.name, ": get_ip_from_dhcp: ssh_Exception:", text
221 content = None
222 self.ssh_conn = None
223 except Exception as e:
224 text = str(e)
225 print self.name, ": get_ip_from_dhcp: Exception:", text
226 content = None
227 self.ssh_conn = None
228
229 if content:
230 self.mac_status[mac_address]["ip"] = content
231 #modify Database
232 self.db_lock.acquire()
233 r,c = self.db.update_rows("ports", {"ip_address": content}, {"mac": mac_address})
234 self.db_lock.release()
235 if r<0:
236 print self.name, ": Database update error:", c
237 else:
238 self.mac_status[mac_address]["retries"] = 0
239 self.mac_status[mac_address]["next_reading"] = (int(now)/3600 +1)* 36000 # 10 hores
240 if self.mac_status[mac_address]["next_reading"] < next_iteration:
241 next_iteration = self.mac_status[mac_address]["next_reading"]
242 print self.name, "mac %s >> %s" % (mac_address, content)
243 continue
244 #a fail has happen
245 self.mac_status[mac_address]["retries"] +=1
246 #next iteration is every 2sec at the beginning; every 5sec after a minute, every 1min after a 5min
247 if now - self.mac_status[mac_address]["active"] > 120:
248 #modify Database to tell openmano that we can not get dhcp from the machine
249 if not self.mac_status[mac_address].get("ip"):
250 self.db_lock.acquire()
251 r,c = self.db.update_rows("ports", {"ip_address": "0.0.0.0"}, {"mac": mac_address})
252 self.db_lock.release()
253 self.mac_status[mac_address]["ip"] = "0.0.0.0"
254 print self.name, "mac %s >> set to 0.0.0.0 because of timeout" % (mac_address)
255
256 if now - self.mac_status[mac_address]["active"] > 60:
257 self.mac_status[mac_address]["next_reading"] = (int(now)/6 +1)* 6
258 elif now - self.mac_status[mac_address]["active"] > 300:
259 self.mac_status[mac_address]["next_reading"] = (int(now)/60 +1)* 60
260 else:
261 self.mac_status[mac_address]["next_reading"] = (int(now)/2 +1)* 2
262
263 if self.mac_status[mac_address]["next_reading"] < next_iteration:
264 next_iteration = self.mac_status[mac_address]["next_reading"]
265 return next_iteration
266
267 def get_fake_ip(self):
268 fake_ip= "192.168.%d.%d" % (random.randint(1,254), random.randint(1,254) )
269 while True:
270 #check not already provided
271 already_used = False
272 for mac_address in self.mac_status:
273 if self.mac_status[mac_address]["ip"] == fake_ip:
274 already_used = True
275 break
276 if not already_used:
277 return fake_ip
278
279
280 #EXAMPLE of bash script that must be available at the DHCP server for "isc-dhcp-server" type
281 # $ cat ./get_dhcp_lease.sh
282 # #!/bin/bash
283 # awk '
284 # ($1=="lease" && $3=="{"){ lease=$2; active="no"; found="no" }
285 # ($1=="binding" && $2=="state" && $3=="active;"){ active="yes" }
286 # ($1=="hardware" && $2=="ethernet" && $3==tolower("'$1';")){ found="yes" }
287 # ($1=="client-hostname"){ name=$2 }
288 # ($1=="}"){ if (active=="yes" && found=="yes"){ target_lease=lease; target_name=name}}
289 # END{printf("%s", target_lease)} #print target_name
290 # ' /var/lib/dhcp/dhcpd.leases
291
292