3 * Copyright 2016 RIFT.IO Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
20 //Supports localhost node polling subscriptions and pass through subscriptions to other websockets
22 //TODO REFACTOR: this needs to happen. there's too much boilerplate code in here.
23 //TODO Document after refactoring
24 //TODO Improved logging for debugging
27 var WebSocket
= require('ws');
28 var Request
= require('request');
29 var _
= require('lodash');
30 var constants
= require('./constants.js');
31 var Promise
= require('promise');
32 var url
= require('url');
33 var sockjs
= require('sockjs');
34 var websocket_multiplex
= require('websocket-multiplex');
35 var utils
= require('./utils.js');
38 var Subscriptions = function() {
40 this.socketServers
= {};
43 Subscriptions
.prototype.configure = function(config
) {
46 // 1. Setup SockJS server
48 this.service
= sockjs
.createServer(sockjs_opts
);
49 // 2. Setup multiplexing
50 this.multiplexer
= new websocket_multiplex
.MultiplexServer(this.service
);
52 this.service
.installHandlers(this.config
.httpServer
, {prefix
:'/multiplex'});
56 * [subscribe description]
58 * @param {String} req.body.url May be http, https, or ws
59 * @param {Function} req.body.transform A function that will transform
60 * the data before sending it out
61 * through the socket. Receives one
62 * argument, which is the data
63 * returned from the subscription.
64 * @param {Function} callback Function that will receive the SubscriptionData reference object
65 * @return {Object} SubscriptionReference An object containing the subscription information.
66 * @return {Number} SubscriptionReference.id The subscription ID
68 Subscriptions
.prototype.subscribe = function(req
, callback
) {
70 var URL
= req
.body
.url
;
71 var SubscriptionReference
;
72 var sessionId
= req
.session
.id
;
73 var protocolTest
= /^(.{2,5}):\/\//;
74 var protocol
= URL
.match(protocolTest
);
78 if (req
.query
['api_server']) {
79 var api_server_protocol
= req
.query
['api_server'].match(protocolTest
)[1];
80 var api_server_origin
= req
.query
['api_server'] + ':' + utils
.getPortForProtocol(api_server_protocol
);
81 origin
= api_server_origin
;
82 protocol
= api_server_protocol
;
84 // TODO: NEED A WAY (URL PARAM) TO TRIGGER THIS PART OF THE CODE
85 // WHICH IS NECESSARY FOR DEVELOPMENT ON MAC
86 // No protocol was passed with the url in the body. Assume req.protocol is protocol and construct URL
87 protocol
= req
.protocol
|| 'https';
88 // Converting relative URL to full path.
89 origin
= protocol
+ '://' + req
.headers
.host
91 var a
= url
.resolve(origin
, req
.baseUrl
);
92 var b
= url
.resolve(a
, URL
);
94 console
.log('DEBUG URL IS', URL
);
96 protocol
= protocol
[1]
99 return new Promise(function(resolve
, reject
) {
104 errorMessage
: 'SocketManager not configured yet. Cannot proceed'
108 self
.createWebSocketServer().then(function(successData
) {
110 self
.socketServers
[sessionId
+ successData
.id
] = successData
;
111 self
.setUpSocketInstance(protocol
, URL
, req
, self
.socketServers
[sessionId
+ successData
.id
].wss
, successData
.id
);
115 id
: self
.socketServers
[sessionId
+ successData
.id
].id
119 function(errorData
) {
122 errorMessage
: errorData
.error
128 Subscriptions
.prototype.setUpSocketInstance = function(protocol
, URL
, req
, wss
, channelId
) {
130 //Need to refactor this to make it more scalable/dynamic
133 self
.socketInstance(URL
, req
, wss
, PollingSocket
, channelId
);
136 self
.socketInstance(URL
, req
, wss
, PollingSocket
, channelId
);
139 self
.socketInstance(URL
, req
, wss
, WebSocket
, channelId
);
142 self
.socketInstance(URL
, req
, wss
, WebSocket
, channelId
);
147 Subscriptions
.prototype.createWebSocketServer = function() {
150 return new Promise(function(resolve
, reject
) {
155 wss
= self
.multiplexer
.registerChannel(self
.ID
);
164 Subscriptions
.prototype.socketInstance = function(url
, req
, wss
, Type
, channelId
) {
165 console
.log('Creating a new socketInstance for:', url
, 'sessionId:', req
.session
.id
);
168 var Connections
= [];
170 var sessionId
= req
.session
.id
;
172 var channelIdRef
= channelId
;
173 wss
.on('connection', function(conn
) {
174 console
.log('New connection to multiplex-server for channelId', channelIdRef
);
176 conn
.on('data', function(msg
) {
177 console
.log('Test purposes only. Received message from client:', msg
);
178 conn
.write('Test purposes only. Echo: ' + msg
);
182 if (Type
== PollingSocket
) {
183 Socket
= new Type(url
, req
, 1000, req
.body
);
185 Socket
= new Type(url
);
187 console
.log('Socket assigned for url', url
);
189 conn
.index
= Index
++;
190 // Add this client-connection into list of connections for this channelId/wss
191 Connections
.push(conn
);
193 conn
.on('close', function() {
194 // Remove the browser connection from list of Connections for this channelId/wss
195 Connections
.splice(conn
.index
, 1);
196 console
.log('splicing conn.index', conn
.index
,' for channel', channelIdRef
);
198 // Check if no other connections exist
199 if (Connections
.length
== 0) {
200 console
.log('No more connections for', channelId
, '. Will close socket server and downstream socket/poller.');
202 // Close downstream socket/poller
205 // Close socket server
208 // Remove from list of socketServers
209 delete self
.socketServers
[sessionId
+ wss
.id
];
211 // There is no unregisterChannel. Assuming
212 // sockjs/websocket-multiplex do the right
213 // things and cleanup after themselves.
215 console
.log('Error closing socket server: ', e
);
222 Socket
.onopen = function() {
223 console
.log('Opened a websocket to southbound server');
226 Socket
.onerror = function(error
) {
227 console
.log('Error on southbound connection. Error:', error
);
230 Socket
.onmessage = function(data
) {
233 if (req
.body
.transform
&& req
.body
.transform
.constructor.name
== "String") {
234 //someTransformObject[req.body.transform](data, send)
235 //req.body.transform(data, send);
237 if (Type
== PollingSocket
) {
244 function send(payload
) {
247 if (typeof payload
== 'string') {
248 var jsonPayload
= JSON
.parse(payload
);
249 is401
= jsonPayload
.statusCode
== 401;
252 is401
= payload
.statusCode
== 401;
258 for (i
= Connections
.length
- 1; i
>= 0; i
-= 1) {
259 // console.log('Sending payload to channelId:', channelId, ' on connection', i);
260 Connections
[i
].write(payload
);
266 console
.log('Error closing Socket')
275 function PollingSocket(url
, req
, interval
, config
) {
276 console
.log('Creating a new PollingSocket for url', url
, 'sessionId:', req
.session
.id
);
278 self
.isClosed
= false;
279 var requestHeaders
= {};
280 _
.extend(requestHeaders
, {
281 'Authorization': req
.get('Authorization')
284 var pollServer = function() {
287 method
: config
.method
|| 'GET',
288 headers
: requestHeaders
,
289 json
: config
.payload
,
290 rejectUnauthorized
: false,
291 forever
: constants
.FOREVER_ON
292 }, function(error
, response
, body
) {
294 console
.log('Error polling: ' + url
);
296 if (!self
.isClosed
) {
297 self
.poll
= setTimeout(pollServer
, 1000 || interval
);
298 var data
= response
.body
;
299 if (self
.onmessage
) {
300 self
.onmessage(data
);
309 PollingSocket
.prototype.close = function() {
310 console
.log('Closing PollingSocket');
312 this.isClosed
= true;
313 clearTimeout(self
.poll
);
317 module
.exports
= Subscriptions
;