3 * Copyright 2016 RIFT.IO Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
20 //Supports localhost node polling subscriptions and pass through subscriptions to other websockets
22 //TODO REFACTOR: this needs to happen. there's too much boilerplate code in here.
23 //TODO Document after refactoring
24 //TODO Improved logging for debugging
27 var WebSocket
= require('ws');
28 var Request
= require('request');
29 var _
= require('lodash');
30 var constants
= require('./constants.js');
31 var Promise
= require('promise');
32 var url
= require('url');
33 var sockjs
= require('sockjs');
34 var websocket_multiplex
= require('websocket-multiplex');
37 function getPortForProtocol (protocol
) {
46 var Subscriptions = function() {
48 this.socketServers
= {};
51 Subscriptions
.prototype.configure = function(config
) {
54 // 1. Setup SockJS server
56 this.service
= sockjs
.createServer(sockjs_opts
);
57 // 2. Setup multiplexing
58 this.multiplexer
= new websocket_multiplex
.MultiplexServer(this.service
);
60 this.service
.installHandlers(this.config
.httpServer
, {prefix
:'/multiplex'});
64 * [subscribe description]
66 * @param {String} req.body.url May be http, https, or ws
67 * @param {Function} req.body.transform A function that will transform
68 * the data before sending it out
69 * through the socket. Receives one
70 * argument, which is the data
71 * returned from the subscription.
72 * @param {Function} callback Function that will receive the SubscriptionData reference object
73 * @return {Object} SubscriptionReference An object containing the subscription information.
74 * @return {Number} SubscriptionReference.id The subscription ID
76 Subscriptions
.prototype.subscribe = function(req
, callback
) {
78 var URL
= req
.body
.url
;
79 var SubscriptionReference
;
80 var sessionId
= req
.session
.id
;
81 var protocolTest
= /^(.{2,5}):\/\//;
82 var protocol
= URL
.match(protocolTest
);
86 if (req
.query
['api_server']) {
87 var api_server_protocol
= req
.query
['api_server'].match(protocolTest
)[1];
88 var api_server_origin
= req
.query
['api_server'] + ':' + getPortForProtocol(api_server_protocol
);
89 origin
= api_server_origin
;
90 protocol
= api_server_protocol
;
92 // TODO: NEED A WAY (URL PARAM) TO TRIGGER THIS PART OF THE CODE
93 // WHICH IS NECESSARY FOR DEVELOPMENT ON MAC
94 // No protocol was passed with the url in the body. Assume req.protocol is protocol and construct URL
95 protocol
= req
.protocol
|| 'https';
96 // Converting relative URL to full path.
97 origin
= protocol
+ '://' + req
.headers
.host
99 var a
= url
.resolve(origin
, req
.baseUrl
);
100 var b
= url
.resolve(a
, URL
);
102 console
.log('DEBUG URL IS', URL
);
104 protocol
= protocol
[1]
107 return new Promise(function(resolve
, reject
) {
112 errorMessage
: 'SocketManager not configured yet. Cannot proceed'
116 self
.createWebSocketServer().then(function(successData
) {
118 self
.socketServers
[sessionId
+ successData
.id
] = successData
;
119 self
.setUpSocketInstance(protocol
, URL
, req
, self
.socketServers
[sessionId
+ successData
.id
].wss
, successData
.id
);
123 id
: self
.socketServers
[sessionId
+ successData
.id
].id
127 function(errorData
) {
130 errorMessage
: errorData
.error
136 Subscriptions
.prototype.setUpSocketInstance = function(protocol
, URL
, req
, wss
, channelId
) {
138 //Need to refactor this to make it more scalable/dynamic
141 self
.socketInstance(URL
, req
, wss
, PollingSocket
, channelId
);
144 self
.socketInstance(URL
, req
, wss
, PollingSocket
, channelId
);
147 self
.socketInstance(URL
, req
, wss
, WebSocket
, channelId
);
150 self
.socketInstance(URL
, req
, wss
, WebSocket
, channelId
);
155 Subscriptions
.prototype.createWebSocketServer = function() {
158 return new Promise(function(resolve
, reject
) {
163 wss
= self
.multiplexer
.registerChannel(self
.ID
);
172 Subscriptions
.prototype.socketInstance = function(url
, req
, wss
, Type
, channelId
) {
173 console
.log('Creating a new socketInstance for:', url
, 'sessionId:', req
.session
.id
);
176 var Connections
= [];
178 var sessionId
= req
.session
.id
;
180 var channelIdRef
= channelId
;
181 wss
.on('connection', function(conn
) {
182 console
.log('New connection to multiplex-server for channelId', channelIdRef
);
184 conn
.on('data', function(msg
) {
185 console
.log('Test purposes only. Received message from client:', msg
);
186 conn
.write('Test purposes only. Echo: ' + msg
);
190 if (Type
== PollingSocket
) {
191 Socket
= new Type(url
, req
, 1000, req
.body
);
193 Socket
= new Type(url
);
195 console
.log('Socket assigned for url', url
);
197 conn
.index
= Index
++;
198 // Add this client-connection into list of connections for this channelId/wss
199 Connections
.push(conn
);
201 conn
.on('close', function() {
202 // Remove the browser connection from list of Connections for this channelId/wss
203 Connections
.splice(conn
.index
, 1);
204 console
.log('splicing conn.index', conn
.index
,' for channel', channelIdRef
);
206 // Check if no other connections exist
207 if (Connections
.length
== 0) {
208 console
.log('No more connections for', channelId
, '. Will close socket server and downstream socket/poller.');
210 // Close downstream socket/poller
213 // Close socket server
216 // Remove from list of socketServers
217 delete self
.socketServers
[sessionId
+ wss
.id
];
219 // There is no unregisterChannel. Assuming
220 // sockjs/websocket-multiplex do the right
221 // things and cleanup after themselves.
223 console
.log('Error closing socket server: ', e
);
230 Socket
.onopen = function() {
231 console
.log('Opened a websocket to southbound server');
234 Socket
.onerror = function(error
) {
235 console
.log('Error on southbound connection. Error:', error
);
238 Socket
.onmessage = function(data
) {
241 if (req
.body
.transform
&& req
.body
.transform
.constructor.name
== "String") {
242 //someTransformObject[req.body.transform](data, send)
243 //req.body.transform(data, send);
245 if (Type
== PollingSocket
) {
252 function send(payload
) {
255 if (typeof payload
== 'string') {
256 var jsonPayload
= JSON
.parse(payload
);
257 is401
= jsonPayload
.statusCode
== 401;
260 is401
= payload
.statusCode
== 401;
266 for (i
= Connections
.length
- 1; i
>= 0; i
-= 1) {
267 // console.log('Sending payload to channelId:', channelId, ' on connection', i);
268 Connections
[i
].write(payload
);
274 console
.log('Error closing Socket')
283 function PollingSocket(url
, req
, interval
, config
) {
284 console
.log('Creating a new PollingSocket for url', url
, 'sessionId:', req
.session
.id
);
286 self
.isClosed
= false;
287 var requestHeaders
= {};
288 _
.extend(requestHeaders
, {
289 'Authorization': req
.get('Authorization')
292 var pollServer = function() {
295 method
: config
.method
|| 'GET',
296 headers
: requestHeaders
,
297 json
: config
.payload
,
298 rejectUnauthorized
: false,
299 forever
: constants
.FOREVER_ON
300 }, function(error
, response
, body
) {
302 console
.log('Error polling: ' + url
);
304 if (!self
.isClosed
) {
305 self
.poll
= setTimeout(pollServer
, 1000 || interval
);
306 var data
= response
.body
;
307 if (self
.onmessage
) {
308 self
.onmessage(data
);
317 PollingSocket
.prototype.close = function() {
318 console
.log('Closing PollingSocket');
320 this.isClosed
= true;
321 clearTimeout(self
.poll
);
325 module
.exports
= Subscriptions
;