RIFT OSM R1 Initial Submission
[osm/SO.git] / examples / ping_pong_ns / ping_pong_ns / pong.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from datetime import date
18 from Queue import Queue
19 import logging
20 import json
21 import socket
22 import threading
23 import time
24
25 import tornado.web
26
27 from util.util import get_url_target
28
29 class Stats(object):
30 def __init__(self):
31 self._request_count = 0
32 self._response_count = 0
33
34 self._lock = threading.Lock()
35
36 @property
37 def request_count(self):
38 with self._lock:
39 return self._request_count
40
41 @request_count.setter
42 def request_count(self, value):
43 with self._lock:
44 self._request_count = value
45
46 @property
47 def response_count(self):
48 with self._lock:
49 return self._response_count
50
51 @response_count.setter
52 def response_count(self, value):
53 with self._lock:
54 self._response_count = value
55
56 class Worker(threading.Thread):
57 def __init__(self, log, connections, stats):
58 super(Worker, self).__init__()
59 self._log = log
60 self._connections = connections
61 self._stats = stats
62
63 self._running = True
64
65 self._lock = threading.Lock()
66
67 @property
68 def running(self):
69 return self._running
70
71 @running.setter
72 def running(self, value):
73 self._running = value
74
75 def run(self):
76 while self.running:
77 try:
78 connection = self._connections.get_nowait()
79 except:
80 continue
81
82 try:
83 req = connection.recv(1024)
84 except socket.error as msg:
85 self._log.error("error with connection read: " % msg)
86 self._connections.put(connection)
87 continue
88
89 if not req:
90 self._connections.put(connection)
91 continue
92
93 resp = req.decode('UTF-8')
94 self._log.debug("got: %s", resp)
95
96 self._stats.request_count += 1
97
98 try:
99 connection.sendall(resp)
100 self._stats.response_count += 1
101 except socket.error as msg:
102 self._log.error("error with connection read: " % msg)
103 self._connections.put(connection)
104 continue
105
106 self._connections.put(connection)
107
108 class Pong(object):
109 def __init__(self, worker_count=5):
110 self._log = logging.getLogger("pong")
111 self._log.setLevel(logging.DEBUG)
112
113 self.listen_ip = None
114 self.listen_port = None
115
116 self._lock = threading.Lock()
117
118 self._connections = Queue()
119
120 self._stats = Stats()
121
122 self._workers = list()
123
124 self._enabled = False
125
126 for _ in range(worker_count):
127 self._workers.append(Worker(self._log, self._connections, self._stats))
128
129 @property
130 def listen_port(self):
131 return self._listen_port
132
133 @listen_port.setter
134 def listen_port(self, value):
135 self._log.debug("new listen port: %s" % value)
136 self._listen_port = value
137
138 @property
139 def listen_ip(self):
140 return self._listen_ip
141
142 @listen_ip.setter
143 def listen_ip(self, value):
144 self._log.debug("listen pong ip: %s" % value)
145 self._listen_ip = value
146
147
148 @property
149 def enabled(self):
150 with self._lock:
151 return self._enabled
152
153 @property
154 def request_count(self):
155 return self._stats.request_count
156
157 @property
158 def response_count(self):
159 return self._stats.response_count
160
161 def start(self):
162 self._log.debug("starting")
163 self._enabled = True
164 self.listener_thread = threading.Thread(target=self._listen)
165 self.listener_thread.start()
166 for worker in self._workers:
167 worker.start()
168
169 def stop(self):
170 with self._lock:
171 self._enabled = False
172
173 self._log.debug("stopping workers")
174 for worker in self._workers:
175 worker.running = False
176
177 self._log.debug("joining on workers")
178 for worker in self._workers:
179 if worker.is_alive():
180 worker.join()
181
182 while self._connections.full():
183 try:
184 connection = self._connections.get_nowait()
185 connection.close()
186 except:
187 pass
188
189 def close_socket(self, msg):
190 with self._lock:
191 if self._socket != None:
192 self._socket.shutdown(socket.SHUT_RD)
193 self._socket.close()
194 self._socket = None
195 self._log.info("Closed socket with msg={}".format(msg))
196
197 def _listen(self):
198 if self._listen_ip is None or self.listen_port is None:
199 self._log.error("address not properly configured to listen")
200 return
201
202 self._log.info("listen for incomming connections")
203 try:
204 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
205 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
206 self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
207 # self._socket.bind((self.listen_ip, self.listen_port))
208 self._socket.bind(("0.0.0.0", self.listen_port))
209 self._socket.settimeout(1)
210
211 while self.enabled:
212
213 try:
214 self._socket.listen(1)
215 connection, address = self._socket.accept()
216 except socket.timeout:
217 continue
218 self._log.info("Accepted connection from {}".format(address))
219
220 self._connections.put(connection)
221 else:
222 self.stop()
223 except socket.error as msg:
224 self.close_socket(msg)
225
226 class PongStatsHandler(tornado.web.RequestHandler):
227 def initialize(self, pong_instance):
228 self._pong_instance = pong_instance
229
230 def get(self):
231 response = {'ping-request-rx-count': self._pong_instance.request_count,
232 'ping-response-tx-count': self._pong_instance.response_count}
233
234 self.write(response)
235
236
237 class PongServerHandler(tornado.web.RequestHandler):
238 def initialize(self, pong_instance):
239 self._pong_instance = pong_instance
240
241 def get(self, args):
242 response = {'ip': self._pong_instance.listen_ip,
243 'port': self._pong_instance.listen_port}
244
245 self.write(response)
246
247 def post(self, args):
248 target = get_url_target(self.request.uri)
249 body = self.request.body.decode("utf-8")
250 body_header = self.request.headers.get("Content-Type")
251
252 if "json" not in body_header:
253 self.write("Content-Type must be some kind of json")
254 self.set_status(405)
255 return
256
257 try:
258 json_dicts = json.loads(body)
259 except:
260 self.write("Content-Type must be some kind of json")
261 self.set_status(405)
262 return
263
264 if target == "server":
265
266 if type(json_dicts['port']) is not int:
267 self.set_status(405)
268 return
269
270 if type(json_dicts['ip']) not in (str, unicode):
271 self.set_status(405)
272 return
273
274 self._pong_instance.listen_ip = json_dicts['ip']
275 self._pong_instance.listen_port = json_dicts['port']
276
277 else:
278 self.set_status(404)
279 return
280
281 self.set_status(200)
282
283 class PongAdminStatusHandler(tornado.web.RequestHandler):
284 def initialize(self, pong_instance):
285 self._pong_instance = pong_instance
286
287 def get(self, args):
288 target = get_url_target(self.request.uri)
289
290 if target == "state":
291 value = "enabled" if self._pong_instance.enabled else "disabled"
292
293 response = { 'adminstatus': value }
294 else:
295 self.set_status(404)
296 return
297
298 self.write(response)
299
300 def post(self, args):
301 target = get_url_target(self.request.uri)
302 body = self.request.body.decode("utf-8")
303 body_header = self.request.headers.get("Content-Type")
304
305 if "json" not in body_header:
306 self.write("Content-Type must be some kind of json")
307 self.set_status(405)
308 return
309
310 try:
311 json_dicts = json.loads(body)
312 except:
313 self.write("Content-Type must be some kind of json")
314 self.set_status(405)
315 return
316
317 if target == "state":
318 if type(json_dicts['enable']) is not bool:
319 self.set_status(405)
320 return
321
322 if json_dicts['enable']:
323 if not self._pong_instance.enabled:
324 self._pong_instance.start()
325 else:
326 self._pong_instance.stop()
327
328 else:
329 self.set_status(404)
330 return
331
332 self.set_status(200)
333
334