RIFT-15318: Hide assets tab when there are none
[osm/UI.git] / skyquake / framework / core / api_utils / sockets.js
1 /*
2 *
3 * Copyright 2016 RIFT.IO Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 //SOCKET MANAGER
19 // test
20 //Supports localhost node polling subscriptions and pass through subscriptions to other websockets
21 //
22 //TODO REFACTOR: this needs to happen. there's too much boilerplate code in here.
23 //TODO Document after refactoring
24 //TODO Improved logging for debugging
25 //TODO List of URLS
26
27 var WebSocket = require('ws');
28 var Request = require('request');
29 var _ = require('lodash');
30 var constants = require('./constants.js');
31 var Promise = require('promise');
32 var url = require('url');
33 var sockjs = require('sockjs');
34 var websocket_multiplex = require('websocket-multiplex');
35
36
37 function getPortForProtocol (protocol) {
38 switch (protocol) {
39 case 'http':
40 return 8000;
41 case 'https':
42 return 8443;
43 }
44 }
45
46 var Subscriptions = function() {
47 this.ID = 0;
48 this.socketServers = {};
49 };
50
51 Subscriptions.prototype.configure = function(config) {
52 this.config = config;
53 this.ready = true;
54 // 1. Setup SockJS server
55 var sockjs_opts = {};
56 this.service = sockjs.createServer(sockjs_opts);
57 // 2. Setup multiplexing
58 this.multiplexer = new websocket_multiplex.MultiplexServer(this.service);
59
60 this.service.installHandlers(this.config.httpServer, {prefix:'/multiplex'});
61 }
62
63 /**
64 * [subscribe description]
65 * @param {Object} req
66 * @param {String} req.body.url May be http, https, or ws
67 * @param {Function} req.body.transform A function that will transform
68 * the data before sending it out
69 * through the socket. Receives one
70 * argument, which is the data
71 * returned from the subscription.
72 * @param {Function} callback Function that will receive the SubscriptionData reference object
73 * @return {Object} SubscriptionReference An object containing the subscription information.
74 * @return {Number} SubscriptionReference.id The subscription ID
75 */
76 Subscriptions.prototype.subscribe = function(req, callback) {
77 var self = this;
78 var URL = req.body.url;
79 var SubscriptionReference;
80 var sessionId = req.session.id;
81 var protocolTest = /^(.{2,5}):\/\//;
82 var protocol = URL.match(protocolTest);
83
84 if (!protocol) {
85 var origin = '';
86 if (req.query['api_server']) {
87 var api_server_protocol = req.query['api_server'].match(protocolTest)[1];
88 var api_server_origin = req.query['api_server'] + ':' + getPortForProtocol(api_server_protocol);
89 origin = api_server_origin;
90 protocol = api_server_protocol;
91 } else {
92 // TODO: NEED A WAY (URL PARAM) TO TRIGGER THIS PART OF THE CODE
93 // WHICH IS NECESSARY FOR DEVELOPMENT ON MAC
94 // No protocol was passed with the url in the body. Assume req.protocol is protocol and construct URL
95 protocol = req.protocol || 'https';
96 // Converting relative URL to full path.
97 origin = protocol + '://' + req.headers.host
98 }
99 var a = url.resolve(origin, req.baseUrl);
100 var b = url.resolve(a, URL);
101 URL = b;
102 console.log('DEBUG URL IS', URL);
103 } else {
104 protocol = protocol[1]
105 }
106
107 return new Promise(function(resolve, reject) {
108
109 if (!self.ready) {
110 return reject({
111 statusCode: 500,
112 errorMessage: 'SocketManager not configured yet. Cannot proceed'
113 })
114 }
115
116 self.createWebSocketServer().then(function(successData) {
117
118 self.socketServers[sessionId + successData.id] = successData;
119 self.setUpSocketInstance(protocol, URL, req, self.socketServers[sessionId + successData.id].wss, successData.id);
120 return resolve({
121 statusCode: 200,
122 data: {
123 id: self.socketServers[sessionId + successData.id].id
124 }
125 });
126 },
127 function(errorData) {
128 return reject({
129 statusCode: 503,
130 errorMessage: errorData.error
131 });
132 });
133 });
134 };
135
136 Subscriptions.prototype.setUpSocketInstance = function(protocol, URL, req, wss, channelId) {
137 var self = this;
138 //Need to refactor this to make it more scalable/dynamic
139 switch (protocol) {
140 case 'http':
141 self.socketInstance(URL, req, wss, PollingSocket, channelId);
142 break;
143 case 'https':
144 self.socketInstance(URL, req, wss, PollingSocket, channelId);
145 break;
146 case 'ws':
147 self.socketInstance(URL, req, wss, WebSocket, channelId);
148 break;
149 case 'wss':
150 self.socketInstance(URL, req, wss, WebSocket, channelId);
151 break;
152 }
153 }
154
155 Subscriptions.prototype.createWebSocketServer = function() {
156 var self = this;
157
158 return new Promise(function(resolve, reject) {
159 var wss = null;
160
161 self.ID++;
162
163 wss = self.multiplexer.registerChannel(self.ID);
164
165 return resolve({
166 id: self.ID,
167 wss: wss
168 });
169 });
170 };
171
172 Subscriptions.prototype.socketInstance = function(url, req, wss, Type, channelId) {
173 console.log('Creating a new socketInstance for:', url, 'sessionId:', req.session.id);
174 var self = this;
175 var Socket = null;
176 var Connections = [];
177 var Index = 0;
178 var sessionId = req.session.id;
179 var wssRef = wss;
180 var channelIdRef = channelId;
181 wss.on('connection', function(conn) {
182 console.log('New connection to multiplex-server for channelId', channelIdRef);
183
184 conn.on('data', function(msg) {
185 console.log('Test purposes only. Received message from client:', msg);
186 conn.write('Test purposes only. Echo: ' + msg);
187 });
188
189 if (!Socket) {
190 if (Type == PollingSocket) {
191 Socket = new Type(url, req, 1000, req.body);
192 } else {
193 Socket = new Type(url);
194 }
195 console.log('Socket assigned for url', url);
196 }
197 conn.index = Index++;
198 // Add this client-connection into list of connections for this channelId/wss
199 Connections.push(conn);
200
201 conn.on('close', function() {
202 // Remove the browser connection from list of Connections for this channelId/wss
203 Connections.splice(conn.index, 1);
204 console.log('splicing conn.index', conn.index,' for channel', channelIdRef);
205
206 // Check if no other connections exist
207 if (Connections.length == 0) {
208 console.log('No more connections for', channelId, '. Will close socket server and downstream socket/poller.');
209 try {
210 // Close downstream socket/poller
211 Socket.close();
212
213 // Close socket server
214 conn.end();
215
216 // Remove from list of socketServers
217 delete self.socketServers[sessionId + wss.id];
218
219 // There is no unregisterChannel. Assuming
220 // sockjs/websocket-multiplex do the right
221 // things and cleanup after themselves.
222 } catch (e) {
223 console.log('Error closing socket server: ', e);
224 }
225 Index = 0;
226 delete Socket;
227 }
228 });
229
230 Socket.onopen = function() {
231 console.log('Opened a websocket to southbound server');
232 };
233
234 Socket.onerror = function(error) {
235 console.log('Error on southbound connection. Error:', error);
236 }
237
238 Socket.onmessage = function(data) {
239 var i;
240 var self = this;
241 if (req.body.transform && req.body.transform.constructor.name == "String") {
242 //someTransformObject[req.body.transform](data, send)
243 //req.body.transform(data, send);
244 } else {
245 if (Type == PollingSocket) {
246 send(data);
247 } else {
248 send(data.data);
249 }
250 }
251
252 function send(payload) {
253 var is401 = false;
254 try {
255 if (typeof payload == 'string') {
256 var jsonPayload = JSON.parse(payload);
257 is401 = jsonPayload.statusCode == 401;
258 }
259 else {
260 is401 = payload.statusCode == 401;
261 }
262 } catch(e) {
263 payload = {}
264 }
265
266 for (i = Connections.length - 1; i >= 0; i -= 1) {
267 // console.log('Sending payload to channelId:', channelId, ' on connection', i);
268 Connections[i].write(payload);
269 };
270 if (is401) {
271 try {
272 Socket.close();
273 } catch (e) {
274 console.log('Error closing Socket')
275 }
276 }
277 }
278
279 };
280 });
281 };
282
283 function PollingSocket(url, req, interval, config) {
284 console.log('Creating a new PollingSocket for url', url, 'sessionId:', req.session.id);
285 var self = this;
286 self.isClosed = false;
287 var requestHeaders = {};
288 _.extend(requestHeaders, {
289 'Authorization': req.get('Authorization')
290 });
291
292 var pollServer = function() {
293 Request({
294 url: url,
295 method: config.method || 'GET',
296 headers: requestHeaders,
297 json: config.payload,
298 rejectUnauthorized: false,
299 forever: constants.FOREVER_ON
300 }, function(error, response, body) {
301 if (error) {
302 console.log('Error polling: ' + url);
303 } else {
304 if (!self.isClosed) {
305 self.poll = setTimeout(pollServer, 1000 || interval);
306 var data = response.body;
307 if (self.onmessage) {
308 self.onmessage(data);
309 }
310 }
311 }
312 });
313 };
314 pollServer();
315 };
316
317 PollingSocket.prototype.close = function() {
318 console.log('Closing PollingSocket');
319 var self = this;
320 this.isClosed = true;
321 clearTimeout(self.poll);
322 };
323
324
325 module.exports = Subscriptions;