X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=skyquake%2Fframework%2Fcore%2Fapi_utils%2Fsockets.js;fp=skyquake%2Fframework%2Fcore%2Fapi_utils%2Fsockets.js;h=607659476d6339fc4a675a950b8cf1bebdfc1c0f;hb=e29efc315df33d546237e270470916e26df391d6;hp=0000000000000000000000000000000000000000;hpb=9c5e457509ba5a1822c316635c6308874e61b4b9;p=osm%2FUI.git diff --git a/skyquake/framework/core/api_utils/sockets.js b/skyquake/framework/core/api_utils/sockets.js new file mode 100644 index 000000000..607659476 --- /dev/null +++ b/skyquake/framework/core/api_utils/sockets.js @@ -0,0 +1,325 @@ +/* + * + * Copyright 2016 RIFT.IO Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +//SOCKET MANAGER +// test +//Supports localhost node polling subscriptions and pass through subscriptions to other websockets +// +//TODO REFACTOR: this needs to happen. there's too much boilerplate code in here. +//TODO Document after refactoring +//TODO Improved logging for debugging +//TODO List of URLS + +var WebSocket = require('ws'); +var Request = require('request'); +var _ = require('lodash'); +var constants = require('./constants.js'); +var Promise = require('promise'); +var url = require('url'); +var sockjs = require('sockjs'); +var websocket_multiplex = require('websocket-multiplex'); + + +function getPortForProtocol (protocol) { + switch (protocol) { + case 'http': + return 8000; + case 'https': + return 8443; + } +} + +var Subscriptions = function() { + this.ID = 0; + this.socketServers = {}; +}; + +Subscriptions.prototype.configure = function(config) { + this.config = config; + this.ready = true; + // 1. Setup SockJS server + var sockjs_opts = {}; + this.service = sockjs.createServer(sockjs_opts); + // 2. Setup multiplexing + this.multiplexer = new websocket_multiplex.MultiplexServer(this.service); + + this.service.installHandlers(this.config.httpServer, {prefix:'/multiplex'}); +} + +/** + * [subscribe description] + * @param {Object} req + * @param {String} req.body.url May be http, https, or ws + * @param {Function} req.body.transform A function that will transform + * the data before sending it out + * through the socket. Receives one + * argument, which is the data + * returned from the subscription. + * @param {Function} callback Function that will receive the SubscriptionData reference object + * @return {Object} SubscriptionReference An object containing the subscription information. + * @return {Number} SubscriptionReference.id The subscription ID + */ +Subscriptions.prototype.subscribe = function(req, callback) { + var self = this; + var URL = req.body.url; + var SubscriptionReference; + var sessionId = req.session.id; + var protocolTest = /^(.{2,5}):\/\//; + var protocol = URL.match(protocolTest); + + if (!protocol) { + var origin = ''; + if (req.query['api_server']) { + var api_server_protocol = req.query['api_server'].match(protocolTest)[1]; + var api_server_origin = req.query['api_server'] + ':' + getPortForProtocol(api_server_protocol); + origin = api_server_origin; + protocol = api_server_protocol; + } else { + // TODO: NEED A WAY (URL PARAM) TO TRIGGER THIS PART OF THE CODE + // WHICH IS NECESSARY FOR DEVELOPMENT ON MAC + // No protocol was passed with the url in the body. Assume req.protocol is protocol and construct URL + protocol = req.protocol || 'https'; + // Converting relative URL to full path. + origin = protocol + '://' + req.headers.host + } + var a = url.resolve(origin, req.baseUrl); + var b = url.resolve(a, URL); + URL = b; + console.log('DEBUG URL IS', URL); + } else { + protocol = protocol[1] + } + + return new Promise(function(resolve, reject) { + + if (!self.ready) { + return reject({ + statusCode: 500, + errorMessage: 'SocketManager not configured yet. Cannot proceed' + }) + } + + self.createWebSocketServer().then(function(successData) { + + self.socketServers[sessionId + successData.id] = successData; + self.setUpSocketInstance(protocol, URL, req, self.socketServers[sessionId + successData.id].wss, successData.id); + return resolve({ + statusCode: 200, + data: { + id: self.socketServers[sessionId + successData.id].id + } + }); + }, + function(errorData) { + return reject({ + statusCode: 503, + errorMessage: errorData.error + }); + }); + }); +}; + +Subscriptions.prototype.setUpSocketInstance = function(protocol, URL, req, wss, channelId) { + var self = this; + //Need to refactor this to make it more scalable/dynamic + switch (protocol) { + case 'http': + self.socketInstance(URL, req, wss, PollingSocket, channelId); + break; + case 'https': + self.socketInstance(URL, req, wss, PollingSocket, channelId); + break; + case 'ws': + self.socketInstance(URL, req, wss, WebSocket, channelId); + break; + case 'wss': + self.socketInstance(URL, req, wss, WebSocket, channelId); + break; + } +} + +Subscriptions.prototype.createWebSocketServer = function() { + var self = this; + + return new Promise(function(resolve, reject) { + var wss = null; + + self.ID++; + + wss = self.multiplexer.registerChannel(self.ID); + + return resolve({ + id: self.ID, + wss: wss + }); + }); +}; + +Subscriptions.prototype.socketInstance = function(url, req, wss, Type, channelId) { + console.log('Creating a new socketInstance for:', url, 'sessionId:', req.session.id); + var self = this; + var Socket = null; + var Connections = []; + var Index = 0; + var sessionId = req.session.id; + var wssRef = wss; + var channelIdRef = channelId; + wss.on('connection', function(conn) { + console.log('New connection to multiplex-server for channelId', channelIdRef); + + conn.on('data', function(msg) { + console.log('Test purposes only. Received message from client:', msg); + conn.write('Test purposes only. Echo: ' + msg); + }); + + if (!Socket) { + if (Type == PollingSocket) { + Socket = new Type(url, req, 1000, req.body); + } else { + Socket = new Type(url); + } + console.log('Socket assigned for url', url); + } + conn.index = Index++; + // Add this client-connection into list of connections for this channelId/wss + Connections.push(conn); + + conn.on('close', function() { + // Remove the browser connection from list of Connections for this channelId/wss + Connections.splice(conn.index, 1); + console.log('splicing conn.index', conn.index,' for channel', channelIdRef); + + // Check if no other connections exist + if (Connections.length == 0) { + console.log('No more connections for', channelId, '. Will close socket server and downstream socket/poller.'); + try { + // Close downstream socket/poller + Socket.close(); + + // Close socket server + conn.end(); + + // Remove from list of socketServers + delete self.socketServers[sessionId + wss.id]; + + // There is no unregisterChannel. Assuming + // sockjs/websocket-multiplex do the right + // things and cleanup after themselves. + } catch (e) { + console.log('Error closing socket server: ', e); + } + Index = 0; + delete Socket; + } + }); + + Socket.onopen = function() { + console.log('Opened a websocket to southbound server'); + }; + + Socket.onerror = function(error) { + console.log('Error on southbound connection. Error:', error); + } + + Socket.onmessage = function(data) { + var i; + var self = this; + if (req.body.transform && req.body.transform.constructor.name == "String") { + //someTransformObject[req.body.transform](data, send) + //req.body.transform(data, send); + } else { + if (Type == PollingSocket) { + send(data); + } else { + send(data.data); + } + } + + function send(payload) { + var is401 = false; + try { + if (typeof payload == 'string') { + var jsonPayload = JSON.parse(payload); + is401 = jsonPayload.statusCode == 401; + } + else { + is401 = payload.statusCode == 401; + } + } catch(e) { + payload = {} + } + + for (i = Connections.length - 1; i >= 0; i -= 1) { + // console.log('Sending payload to channelId:', channelId, ' on connection', i); + Connections[i].write(payload); + }; + if (is401) { + try { + Socket.close(); + } catch (e) { + console.log('Error closing Socket') + } + } + } + + }; + }); +}; + +function PollingSocket(url, req, interval, config) { + console.log('Creating a new PollingSocket for url', url, 'sessionId:', req.session.id); + var self = this; + self.isClosed = false; + var requestHeaders = {}; + _.extend(requestHeaders, { + 'Authorization': req.get('Authorization') + }); + + var pollServer = function() { + Request({ + url: url, + method: config.method || 'GET', + headers: requestHeaders, + json: config.payload, + rejectUnauthorized: false, + forever: constants.FOREVER_ON + }, function(error, response, body) { + if (error) { + console.log('Error polling: ' + url); + } else { + if (!self.isClosed) { + self.poll = setTimeout(pollServer, 1000 || interval); + var data = response.body; + if (self.onmessage) { + self.onmessage(data); + } + } + } + }); + }; + pollServer(); +}; + +PollingSocket.prototype.close = function() { + console.log('Closing PollingSocket'); + var self = this; + this.isClosed = true; + clearTimeout(self.poll); +}; + + +module.exports = Subscriptions;