Doing vim_db threading safe with a Lock.
[osm/openvim.git] / osm_openvim / dhcp_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the dhcp server to get the IP addresses
26 '''
27 __author__="Pablo Montes, Alfonso Tierno"
28 __date__ ="$4-Jan-2016 12:07:15$"
29
30
31
32 import threading
33 import time
34 import Queue
35 import paramiko
36 import random
37 import subprocess
38 import logging
39
40 #TODO: insert a logging system
41
42 class dhcp_thread(threading.Thread):
43 def __init__(self, dhcp_params, db, test, dhcp_nets, logger_name=None, debug=None):
44 '''Init a thread.
45 Arguments: thread_info must be a dictionary with:
46 'dhcp_params' dhcp server parameters with the following keys:
47 mandatory : user, host, port, key, ifaces(interface name list of the one managed by the dhcp)
48 optional: password, key, port(22)
49 'db': database class threading safe
50 'test': in test mode no acces to a server is done, and ip is invented
51 '''
52 threading.Thread.__init__(self)
53 self.dhcp_params = dhcp_params
54 self.db = db
55 self.test = test
56 self.dhcp_nets = dhcp_nets
57 self.ssh_conn = None
58 if logger_name:
59 self.logger_name = logger_name
60 else:
61 self.logger_name = "openvim.dhcp"
62 self.logger = logging.getLogger(self.logger_name)
63 if debug:
64 self.logger.setLevel(getattr(logging, debug))
65
66 self.mac_status ={} #dictionary of mac_address to retrieve information
67 #ip: None
68 #retries:
69 #next_reading: time for the next trying to check ACTIVE status or IP
70 #created: time when it was added
71 #active: time when the VM becomes into ACTIVE status
72
73
74 self.queueLock = threading.Lock()
75 self.taskQueue = Queue.Queue(2000)
76
77 def ssh_connect(self):
78 try:
79 #Connect SSH
80 self.ssh_conn = paramiko.SSHClient()
81 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
82 self.ssh_conn.load_system_host_keys()
83 self.ssh_conn.connect(self.dhcp_params["host"], port=self.dhcp_params.get("port", 22),
84 username=self.dhcp_params["user"], password=self.dhcp_params.get("password"),
85 key_filename=self.dhcp_params.get("keyfile"), timeout=2)
86 except paramiko.ssh_exception.SSHException as e:
87 self.logger.error("ssh_connect ssh Exception " + str(e))
88
89 def load_mac_from_db(self):
90 #TODO get macs to follow from the database
91 self.logger.debug("load macs from db")
92 r,c = self.db.get_table(SELECT=('mac','ip_address','nets.uuid as net_id', ),
93 FROM='ports join nets on ports.net_id=nets.uuid',
94 WHERE_NOT={'ports.instance_id': None, 'nets.provider': None})
95 now = time.time()
96 self.mac_status ={}
97 if r<0:
98 self.logger.error("Error getting data from database: " + c)
99 return
100 for port in c:
101 if port["net_id"] in self.dhcp_nets:
102 self.mac_status[ port["mac"] ] = {"ip": port["ip_address"], "next_reading": now, "created": now, "retries":0}
103
104 def insert_task(self, task, *aditional):
105 try:
106 self.queueLock.acquire()
107 task = self.taskQueue.put( (task,) + aditional, timeout=5)
108 self.queueLock.release()
109 return 1, None
110 except Queue.Full:
111 return -1, "timeout inserting a task over dhcp_thread"
112
113 def run(self):
114 self.logger.debug("starting, nets: " + str(self.dhcp_nets))
115 next_iteration = time.time() + 10
116 while True:
117 self.load_mac_from_db()
118 while True:
119 try:
120 self.queueLock.acquire()
121 if not self.taskQueue.empty():
122 task = self.taskQueue.get()
123 else:
124 task = None
125 self.queueLock.release()
126
127 if task is None:
128 now=time.time()
129 if now >= next_iteration:
130 next_iteration = self.get_ip_from_dhcp()
131 else:
132 time.sleep(1)
133 continue
134
135 if task[0] == 'add':
136 self.logger.debug("processing task add mac " + str(task[1]))
137 now=time.time()
138 self.mac_status[task[1] ] = {"ip": None, "next_reading": now, "created": now, "retries":0}
139 next_iteration = now
140 elif task[0] == 'del':
141 self.logger.debug("processing task del mac " + str(task[1]))
142 if task[1] in self.mac_status:
143 del self.mac_status[task[1] ]
144 elif task[0] == 'exit':
145 self.logger.debug("processing task exit")
146 self.terminate()
147 return 0
148 else:
149 self.logger.error("unknown task: " + str(task))
150 except Exception as e:
151 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
152
153 def terminate(self):
154 try:
155 if self.ssh_conn:
156 self.ssh_conn.close()
157 except Exception as e:
158 self.logger.error("terminate Exception: " + str(e))
159 self.logger.debug("exit from dhcp_thread")
160
161 def get_ip_from_dhcp(self):
162
163 now = time.time()
164 next_iteration= now + 40000 # >10 hores
165
166 #print self.name, "Iteration"
167 for mac_address in self.mac_status:
168 if now < self.mac_status[mac_address]["next_reading"]:
169 if self.mac_status[mac_address]["next_reading"] < next_iteration:
170 next_iteration = self.mac_status[mac_address]["next_reading"]
171 continue
172
173 if self.mac_status[mac_address].get("active") == None:
174 #check from db if already active
175 r,c = self.db.get_table(FROM="ports as p join instances as i on p.instance_id=i.uuid",
176 WHERE={"p.mac": mac_address, "i.status": "ACTIVE"})
177 if r>0:
178 self.mac_status[mac_address]["active"] = now
179 self.mac_status[mac_address]["next_reading"] = (int(now)/2 +1)* 2
180 self.logger.debug("mac %s VM ACTIVE", mac_address)
181 self.mac_status[mac_address]["retries"] = 0
182 else:
183 #print self.name, "mac %s VM INACTIVE" % (mac_address)
184 if now - self.mac_status[mac_address]["created"] > 300:
185 #modify Database to tell openmano that we can not get dhcp from the machine
186 if not self.mac_status[mac_address].get("ip"):
187 r,c = self.db.update_rows("ports", {"ip_address": "0.0.0.0"}, {"mac": mac_address})
188 self.mac_status[mac_address]["ip"] = "0.0.0.0"
189 self.logger.debug("mac %s >> set to 0.0.0.0 because of timeout", mac_address)
190 self.mac_status[mac_address]["next_reading"] = (int(now)/60 +1)* 60
191 else:
192 self.mac_status[mac_address]["next_reading"] = (int(now)/6 +1)* 6
193 if self.mac_status[mac_address]["next_reading"] < next_iteration:
194 next_iteration = self.mac_status[mac_address]["next_reading"]
195 continue
196
197
198 if self.test:
199 if self.mac_status[mac_address]["retries"]>random.randint(10,100): #wait between 10 and 100 seconds to produce a fake IP
200 content = self.get_fake_ip()
201 else:
202 content = None
203 elif self.dhcp_params["host"]=="localhost":
204 try:
205 command = ['get_dhcp_lease.sh', mac_address]
206 content = subprocess.check_output(command)
207 except Exception as e:
208 self.logger.error("get_ip_from_dhcp subprocess Exception " + str(e))
209 content = None
210 else:
211 try:
212 if not self.ssh_conn:
213 self.ssh_connect()
214 command = 'get_dhcp_lease.sh ' + mac_address
215 (_, stdout, _) = self.ssh_conn.exec_command(command)
216 content = stdout.read()
217 except paramiko.ssh_exception.SSHException as e:
218 self.logger.error("get_ip_from_dhcp: ssh_Exception: " + str(e))
219 content = None
220 self.ssh_conn = None
221 except Exception as e:
222 self.logger.error("get_ip_from_dhcp: Exception: " + str(e))
223 content = None
224 self.ssh_conn = None
225
226 if content:
227 self.mac_status[mac_address]["ip"] = content
228 #modify Database
229 r,c = self.db.update_rows("ports", {"ip_address": content}, {"mac": mac_address})
230 if r<0:
231 self.logger.error("Database update error: " + c)
232 else:
233 self.mac_status[mac_address]["retries"] = 0
234 self.mac_status[mac_address]["next_reading"] = (int(now)/3600 +1)* 36000 # 10 hores
235 if self.mac_status[mac_address]["next_reading"] < next_iteration:
236 next_iteration = self.mac_status[mac_address]["next_reading"]
237 self.logger.debug("mac %s >> %s", mac_address, content)
238 continue
239 #a fail has happen
240 self.mac_status[mac_address]["retries"] +=1
241 #next iteration is every 2sec at the beginning; every 5sec after a minute, every 1min after a 5min
242 if now - self.mac_status[mac_address]["active"] > 120:
243 #modify Database to tell openmano that we can not get dhcp from the machine
244 if not self.mac_status[mac_address].get("ip"):
245 r,c = self.db.update_rows("ports", {"ip_address": "0.0.0.0"}, {"mac": mac_address})
246 self.mac_status[mac_address]["ip"] = "0.0.0.0"
247 self.logger.debug("mac %s >> set to 0.0.0.0 because of timeout", mac_address)
248
249 if now - self.mac_status[mac_address]["active"] > 60:
250 self.mac_status[mac_address]["next_reading"] = (int(now)/6 +1)* 6
251 elif now - self.mac_status[mac_address]["active"] > 300:
252 self.mac_status[mac_address]["next_reading"] = (int(now)/60 +1)* 60
253 else:
254 self.mac_status[mac_address]["next_reading"] = (int(now)/2 +1)* 2
255
256 if self.mac_status[mac_address]["next_reading"] < next_iteration:
257 next_iteration = self.mac_status[mac_address]["next_reading"]
258 return next_iteration
259
260 def get_fake_ip(self):
261 fake_ip = "192.168.{}.{}".format(random.randint(1,254), random.randint(1,254) )
262 while True:
263 #check not already provided
264 already_used = False
265 for mac_address in self.mac_status:
266 if self.mac_status[mac_address]["ip"] == fake_ip:
267 already_used = True
268 break
269 if not already_used:
270 return fake_ip
271
272
273 #EXAMPLE of bash script that must be available at the DHCP server for "isc-dhcp-server" type
274 # $ cat ./get_dhcp_lease.sh
275 # #!/bin/bash
276 # awk '
277 # ($1=="lease" && $3=="{"){ lease=$2; active="no"; found="no" }
278 # ($1=="binding" && $2=="state" && $3=="active;"){ active="yes" }
279 # ($1=="hardware" && $2=="ethernet" && $3==tolower("'$1';")){ found="yes" }
280 # ($1=="client-hostname"){ name=$2 }
281 # ($1=="}"){ if (active=="yes" && found=="yes"){ target_lease=lease; target_name=name}}
282 # END{printf("%s", target_lease)} #print target_name
283 # ' /var/lib/dhcp/dhcpd.leases
284
285