Rift.IO OSM R1 Initial Submission
[osm/UI.git] / skyquake / framework / core / api_utils / sockets.js
diff --git a/skyquake/framework/core/api_utils/sockets.js b/skyquake/framework/core/api_utils/sockets.js
new file mode 100644 (file)
index 0000000..6076594
--- /dev/null
@@ -0,0 +1,325 @@
+/*
+ * 
+ *   Copyright 2016 RIFT.IO Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+//SOCKET MANAGER
+// test
+//Supports localhost node polling subscriptions and pass through subscriptions to other websockets
+//
+//TODO REFACTOR: this needs to happen. there's too much boilerplate code in here.
+//TODO Document after refactoring
+//TODO Improved logging for debugging
+//TODO List of URLS
+
+var WebSocket = require('ws');
+var Request = require('request');
+var _ = require('lodash');
+var constants = require('./constants.js');
+var Promise = require('promise');
+var url = require('url');
+var sockjs = require('sockjs');
+var websocket_multiplex = require('websocket-multiplex');
+
+
+function getPortForProtocol (protocol) {
+  switch (protocol) {
+    case 'http':
+      return 8000;
+    case 'https':
+      return 8443;
+  }
+}
+
+var Subscriptions = function() {
+  this.ID = 0;
+  this.socketServers = {};
+};
+
+Subscriptions.prototype.configure = function(config) {
+  this.config = config;
+  this.ready = true;
+  // 1. Setup SockJS server
+  var sockjs_opts = {};
+  this.service = sockjs.createServer(sockjs_opts);
+  // 2. Setup multiplexing
+  this.multiplexer = new websocket_multiplex.MultiplexServer(this.service);
+
+  this.service.installHandlers(this.config.httpServer, {prefix:'/multiplex'});
+}
+
+/**
+ * [subscribe description]
+ * @param  {Object}   req
+ * @param  {String}   req.body.url May be http, https, or ws
+ * @param  {Function} req.body.transform A function that will transform
+ *                                      the data before sending it out
+ *                                      through the socket. Receives one
+ *                                      argument, which is the data
+ *                                      returned from the subscription.
+ * @param  {Function} callback Function that will receive the SubscriptionData reference object
+ * @return {Object}   SubscriptionReference  An object containing the subscription information.
+ * @return {Number} SubscriptionReference.id The subscription ID
+ */
+Subscriptions.prototype.subscribe = function(req, callback) {
+  var self = this;
+  var URL = req.body.url;
+  var SubscriptionReference;
+  var sessionId = req.session.id;
+  var protocolTest = /^(.{2,5}):\/\//;
+  var protocol = URL.match(protocolTest);
+
+  if (!protocol) {
+    var origin = '';
+    if (req.query['api_server']) {
+      var api_server_protocol = req.query['api_server'].match(protocolTest)[1];
+      var api_server_origin = req.query['api_server'] + ':' + getPortForProtocol(api_server_protocol);
+      origin = api_server_origin;
+      protocol = api_server_protocol;
+    } else {
+      // TODO: NEED A WAY (URL PARAM) TO TRIGGER THIS PART OF THE CODE
+      // WHICH IS NECESSARY FOR DEVELOPMENT ON MAC
+      // No protocol was passed with the url in the body. Assume req.protocol is protocol and construct URL
+      protocol = req.protocol || 'https';
+      // Converting relative URL to full path.
+      origin = protocol + '://' + req.headers.host
+    }
+    var a = url.resolve(origin, req.baseUrl);
+    var b = url.resolve(a, URL);
+    URL = b;
+    console.log('DEBUG URL IS', URL);
+  } else {
+    protocol = protocol[1]
+  }
+
+  return new Promise(function(resolve, reject) {
+
+    if (!self.ready) {
+      return reject({
+        statusCode: 500,
+        errorMessage: 'SocketManager not configured yet. Cannot proceed'
+      })
+    }
+
+    self.createWebSocketServer().then(function(successData) {
+
+        self.socketServers[sessionId + successData.id] = successData;
+        self.setUpSocketInstance(protocol, URL, req, self.socketServers[sessionId + successData.id].wss, successData.id);
+        return resolve({
+          statusCode: 200,
+          data: {
+            id: self.socketServers[sessionId + successData.id].id
+          }
+        });
+      },
+      function(errorData) {
+        return reject({
+          statusCode: 503,
+          errorMessage: errorData.error
+        });
+      });
+  });
+};
+
+Subscriptions.prototype.setUpSocketInstance = function(protocol, URL, req, wss, channelId) {
+  var self = this;
+  //Need to refactor this to make it more scalable/dynamic
+  switch (protocol) {
+    case 'http':
+      self.socketInstance(URL, req, wss, PollingSocket, channelId);
+      break;
+    case 'https':
+      self.socketInstance(URL, req, wss, PollingSocket, channelId);
+      break;
+    case 'ws':
+      self.socketInstance(URL, req, wss, WebSocket, channelId);
+      break;
+    case 'wss':
+      self.socketInstance(URL, req, wss, WebSocket, channelId);
+      break;
+  }
+}
+
+Subscriptions.prototype.createWebSocketServer = function() {
+  var self = this;
+
+  return new Promise(function(resolve, reject) {
+    var wss = null;
+
+    self.ID++;
+
+    wss = self.multiplexer.registerChannel(self.ID);
+
+    return resolve({
+      id: self.ID,
+      wss: wss
+    });
+  });
+};
+
+Subscriptions.prototype.socketInstance = function(url, req, wss, Type, channelId) {
+  console.log('Creating a new socketInstance for:', url, 'sessionId:', req.session.id);
+  var self = this;
+  var Socket = null;
+  var Connections = [];
+  var Index = 0;
+  var sessionId = req.session.id;
+  var wssRef = wss;
+  var channelIdRef = channelId;
+  wss.on('connection', function(conn) {
+    console.log('New connection to multiplex-server for channelId', channelIdRef);
+
+    conn.on('data', function(msg) {
+      console.log('Test purposes only. Received message from client:', msg);
+      conn.write('Test purposes only. Echo: ' + msg);
+    });
+
+    if (!Socket) {
+      if (Type == PollingSocket) {
+        Socket = new Type(url, req, 1000, req.body);
+      } else {
+        Socket = new Type(url);
+      }
+      console.log('Socket assigned for url', url);
+    }
+    conn.index = Index++;
+    // Add this client-connection into list of connections for this channelId/wss
+    Connections.push(conn);
+
+    conn.on('close', function() {
+      // Remove the browser connection from list of Connections for this channelId/wss
+      Connections.splice(conn.index, 1);
+      console.log('splicing conn.index', conn.index,' for channel', channelIdRef);
+
+      // Check if no other connections exist
+      if (Connections.length == 0) {
+        console.log('No more connections for', channelId, '. Will close socket server and downstream socket/poller.');
+        try {
+          // Close downstream socket/poller
+          Socket.close();
+
+          // Close socket server
+          conn.end();
+
+          // Remove from list of socketServers
+          delete self.socketServers[sessionId + wss.id];
+
+          // There is no unregisterChannel. Assuming
+          // sockjs/websocket-multiplex do the right
+          // things and cleanup after themselves.
+        } catch (e) {
+          console.log('Error closing socket server: ', e);
+        }
+        Index = 0;
+        delete Socket;
+      }
+    });
+
+    Socket.onopen = function() {
+      console.log('Opened a websocket to southbound server');
+    };
+
+    Socket.onerror = function(error) {
+      console.log('Error on southbound connection. Error:', error);
+    }
+
+    Socket.onmessage = function(data) {
+      var i;
+      var self = this;
+      if (req.body.transform && req.body.transform.constructor.name == "String") {
+        //someTransformObject[req.body.transform](data, send)
+        //req.body.transform(data, send);
+      } else {
+        if (Type == PollingSocket) {
+          send(data);
+        } else {
+          send(data.data);
+        }
+      }
+
+      function send(payload) {
+        var is401 = false;
+        try {
+          if (typeof payload == 'string') {
+            var jsonPayload = JSON.parse(payload);
+            is401 = jsonPayload.statusCode == 401;
+          }
+          else {
+            is401 = payload.statusCode == 401;
+          }
+        } catch(e) {
+          payload = {}
+        }
+
+        for (i = Connections.length - 1; i >= 0; i -= 1) {
+          // console.log('Sending payload to channelId:', channelId, ' on connection', i);
+          Connections[i].write(payload);
+        };
+        if (is401) {
+          try {
+            Socket.close();
+          } catch (e) {
+            console.log('Error closing Socket')
+          }
+        }
+      }
+
+    };
+  });
+};
+
+function PollingSocket(url, req, interval, config) {
+  console.log('Creating a new PollingSocket for url', url, 'sessionId:', req.session.id);
+  var self = this;
+  self.isClosed = false;
+  var requestHeaders = {};
+  _.extend(requestHeaders, {
+    'Authorization': req.get('Authorization')
+  });
+
+  var pollServer = function() {
+    Request({
+      url: url,
+      method: config.method || 'GET',
+      headers: requestHeaders,
+      json: config.payload,
+      rejectUnauthorized: false,
+      forever: constants.FOREVER_ON
+    }, function(error, response, body) {
+      if (error) {
+        console.log('Error polling: ' + url);
+      } else {
+        if (!self.isClosed) {
+          self.poll = setTimeout(pollServer, 1000 || interval);
+          var data = response.body;
+          if (self.onmessage) {
+            self.onmessage(data);
+          }
+        }
+      }
+    });
+  };
+  pollServer();
+};
+
+PollingSocket.prototype.close = function() {
+  console.log('Closing PollingSocket');
+  var self = this;
+  this.isClosed = true;
+  clearTimeout(self.poll);
+};
+
+
+module.exports = Subscriptions;