RIFT OSM R1 Initial Submission
[osm/SO.git] / examples / ping_pong_ns / ping_pong_ns / pong.py
diff --git a/examples/ping_pong_ns/ping_pong_ns/pong.py b/examples/ping_pong_ns/ping_pong_ns/pong.py
new file mode 100644 (file)
index 0000000..ee5c2d2
--- /dev/null
@@ -0,0 +1,334 @@
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+from datetime import date
+from Queue import Queue
+import logging
+import json
+import socket
+import threading
+import time
+
+import tornado.web
+
+from util.util import get_url_target
+class Stats(object):
+    def __init__(self):
+        self._request_count = 0
+        self._response_count = 0
+
+        self._lock = threading.Lock()
+
+    @property
+    def request_count(self):
+        with self._lock:
+            return self._request_count
+
+    @request_count.setter
+    def request_count(self, value):
+        with self._lock:
+            self._request_count = value
+
+    @property
+    def response_count(self):
+        with self._lock:
+            return self._response_count
+
+    @response_count.setter
+    def response_count(self, value):
+        with self._lock:
+            self._response_count = value
+        
+class Worker(threading.Thread):
+    def __init__(self, log, connections, stats):
+        super(Worker, self).__init__()
+        self._log = log
+        self._connections = connections
+        self._stats = stats
+
+        self._running = True
+
+        self._lock = threading.Lock()
+        
+    @property
+    def running(self):
+        return self._running
+
+    @running.setter
+    def running(self, value):
+        self._running = value
+
+    def run(self):
+        while self.running:
+            try:
+                connection = self._connections.get_nowait()
+            except:
+                continue
+            
+            try:
+                req = connection.recv(1024)
+            except socket.error as msg:
+                self._log.error("error with connection read: " % msg)
+                self._connections.put(connection)
+                continue
+
+            if not req:
+                self._connections.put(connection)
+                continue
+
+            resp = req.decode('UTF-8')
+            self._log.debug("got: %s", resp)
+
+            self._stats.request_count += 1
+
+            try:
+                connection.sendall(resp)
+                self._stats.response_count += 1
+            except socket.error as msg:
+                self._log.error("error with connection read: " % msg)
+                self._connections.put(connection)
+                continue
+
+            self._connections.put(connection)        
+
+class Pong(object):
+    def __init__(self, worker_count=5):
+        self._log = logging.getLogger("pong")
+        self._log.setLevel(logging.DEBUG)
+
+        self.listen_ip = None
+        self.listen_port = None
+
+        self._lock = threading.Lock()
+
+        self._connections = Queue()
+        
+        self._stats = Stats()
+
+        self._workers = list()
+
+        self._enabled = False
+
+        for _ in range(worker_count):
+            self._workers.append(Worker(self._log, self._connections, self._stats))
+
+    @property
+    def listen_port(self):
+        return self._listen_port
+
+    @listen_port.setter
+    def listen_port(self, value):
+        self._log.debug("new listen port: %s" % value)
+        self._listen_port = value
+
+    @property
+    def listen_ip(self):
+        return self._listen_ip
+
+    @listen_ip.setter
+    def listen_ip(self, value):
+        self._log.debug("listen pong ip: %s" % value)
+        self._listen_ip = value
+
+
+    @property
+    def enabled(self):
+        with self._lock:
+            return self._enabled
+
+    @property
+    def request_count(self):
+        return self._stats.request_count
+
+    @property
+    def response_count(self):
+        return self._stats.response_count
+
+    def start(self):
+        self._log.debug("starting")
+        self._enabled = True
+        self.listener_thread = threading.Thread(target=self._listen)
+        self.listener_thread.start()
+        for worker in self._workers:
+            worker.start()
+
+    def stop(self):
+        with self._lock:
+            self._enabled = False
+
+            self._log.debug("stopping workers")
+            for worker in self._workers:
+                worker.running = False
+
+            self._log.debug("joining on workers")
+            for worker in self._workers:
+                if worker.is_alive():
+                    worker.join()
+
+            while self._connections.full():
+                try:
+                    connection = self._connections.get_nowait()
+                    connection.close()
+                except:
+                    pass
+
+    def close_socket(self, msg):
+        with self._lock:
+            if self._socket != None:
+                self._socket.shutdown(socket.SHUT_RD)
+                self._socket.close()
+                self._socket = None
+                self._log.info("Closed socket with msg={}".format(msg))
+
+    def _listen(self):
+        if self._listen_ip is None or self.listen_port is None:
+            self._log.error("address not properly configured to listen")
+            return
+
+        self._log.info("listen for incomming connections")
+        try:
+            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+            self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+            # self._socket.bind((self.listen_ip, self.listen_port))
+            self._socket.bind(("0.0.0.0", self.listen_port))
+            self._socket.settimeout(1)
+
+            while self.enabled:
+                
+                try:
+                    self._socket.listen(1)
+                    connection, address = self._socket.accept()
+                except socket.timeout:
+                    continue
+                self._log.info("Accepted connection from {}".format(address))
+
+                self._connections.put(connection)
+            else:
+                self.stop()
+        except socket.error as msg:
+            self.close_socket(msg)
+
+class PongStatsHandler(tornado.web.RequestHandler):
+    def initialize(self, pong_instance):
+        self._pong_instance = pong_instance
+
+    def get(self):
+        response = {'ping-request-rx-count': self._pong_instance.request_count,
+                    'ping-response-tx-count': self._pong_instance.response_count}
+
+        self.write(response)
+
+
+class PongServerHandler(tornado.web.RequestHandler):
+    def initialize(self, pong_instance):
+        self._pong_instance = pong_instance
+
+    def get(self, args):
+        response = {'ip': self._pong_instance.listen_ip,
+                    'port': self._pong_instance.listen_port}
+
+        self.write(response)
+
+    def post(self, args):
+        target = get_url_target(self.request.uri)
+        body = self.request.body.decode("utf-8")
+        body_header = self.request.headers.get("Content-Type")
+
+        if "json" not in body_header:
+            self.write("Content-Type must be some kind of json")
+            self.set_status(405)
+            return
+
+        try:
+            json_dicts = json.loads(body)
+        except:
+            self.write("Content-Type must be some kind of json")
+            self.set_status(405)
+            return
+
+        if target == "server":
+
+            if type(json_dicts['port']) is not int:
+                self.set_status(405)
+                return
+
+            if type(json_dicts['ip']) not in (str, unicode):
+                self.set_status(405)
+                return
+
+            self._pong_instance.listen_ip = json_dicts['ip']
+            self._pong_instance.listen_port = json_dicts['port']
+
+        else:
+            self.set_status(404)
+            return
+
+        self.set_status(200)
+
+class PongAdminStatusHandler(tornado.web.RequestHandler):
+    def initialize(self, pong_instance):
+        self._pong_instance = pong_instance
+
+    def get(self, args):
+        target = get_url_target(self.request.uri)
+        
+        if target == "state":
+            value = "enabled" if self._pong_instance.enabled else "disabled"
+
+            response = { 'adminstatus': value }
+        else:
+            self.set_status(404)
+            return
+
+        self.write(response)
+
+    def post(self, args):
+        target = get_url_target(self.request.uri)
+        body = self.request.body.decode("utf-8")
+        body_header = self.request.headers.get("Content-Type")
+
+        if "json" not in body_header:
+            self.write("Content-Type must be some kind of json")
+            self.set_status(405)            
+            return
+            
+        try:
+            json_dicts = json.loads(body)
+        except:
+            self.write("Content-Type must be some kind of json")
+            self.set_status(405)            
+            return
+
+        if target == "state":
+            if type(json_dicts['enable']) is not bool:
+                self.set_status(405)            
+                return
+
+            if json_dicts['enable']:
+                if not self._pong_instance.enabled:
+                    self._pong_instance.start()
+            else:
+                self._pong_instance.stop()
+
+        else:
+            self.set_status(404)
+            return
+
+        self.set_status(200)
+
+