b6cbf56892acb91650be5a7a917fe062430764d9
[osm/riftware.git] /
1 """
2
3 # (c) Copyright RIFT.io, 2013-2016, All Rights Reserved
4 #
5
6 @file rwcalproxytasklet.py
7 @author Austin Cormier(austin.cormier@riftio.com)
8 @date 2015-10-20
9 """
10
11 import asyncio
12 import collections
13 import concurrent.futures
14 import logging
15 import os
16 import sys
17
18 import tornado
19 import tornado.httpserver
20 import tornado.web
21 import tornado.platform.asyncio
22
23 import gi
24 gi.require_version('RwDts', '1.0')
25 gi.require_version('RwcalYang', '1.0')
26 gi.require_version('RwTypes', '1.0')
27 from gi.repository import (
28     RwDts as rwdts,
29     RwcalYang,
30     RwTypes,
31 )
32
33 import rw_peas
34 import rift.tasklets
35
36 if sys.version_info < (3, 4, 4):
37     asyncio.ensure_future = asyncio.async
38
39
40 class CalCallFailure(Exception):
41     pass
42
43
44 class RPCParam(object):
45     def __init__(self, key, proto_type=None):
46         self.key = key
47         self.proto_type = proto_type
48
49
50 class CalRequestHandler(tornado.web.RequestHandler):
51     def initialize(self, log, loop, cal, account, executor, cal_method,
52                    input_params=None, output_params=None):
53         self.log = log
54         self.loop = loop
55         self.cal = cal
56         self.account = account
57         self.executor = executor
58         self.cal_method = cal_method
59         self.input_params = input_params
60         self.output_params = output_params
61
62     def wrap_status_fn(self, fn, *args, **kwargs):
63         ret = fn(*args, **kwargs)
64         if not isinstance(ret, collections.Iterable):
65             ret = [ret]
66
67         rw_status = ret[0]
68         if type(rw_status) != RwTypes.RwStatus:
69             raise ValueError("First return value of %s function was not a RwStatus" %
70                              fn.__name__)
71
72         if rw_status != RwTypes.RwStatus.SUCCESS:
73             msg = "%s returned %s" % (fn.__name__, str(rw_status))
74             self.log.error(msg)
75             raise CalCallFailure(msg)
76
77         return ret[1:]
78
79     @tornado.gen.coroutine
80     def post(self):
81         def body_to_cal_args():
82             cal_args = []
83             if self.input_params is None:
84                 return cal_args
85
86             input_dict = tornado.escape.json_decode(self.request.body)
87             if len(input_dict) != len(self.input_params):
88                 raise ValueError("Got %s parameters, expected %s" %
89                                  (len(input_dict), len(self.input_params)))
90
91             for input_param in self.input_params:
92                 key = input_param.key
93                 value = input_dict[key]
94                 proto_type = input_param.proto_type
95
96                 if proto_type is not None:
97                     proto_cls = getattr(RwcalYang, proto_type)
98                     self.log.debug("Deserializing into %s type", proto_cls)
99                     value = proto_cls.from_dict(value)
100
101                 cal_args.append(value)
102
103             return cal_args
104
105         def cal_return_vals(return_vals):
106             output_params = self.output_params
107             if output_params is None:
108                 output_params = []
109
110             if len(return_vals) != len(output_params):
111                 raise ValueError("Got %s return values.  Expected %s",
112                                  len(return_vals), len(output_params))
113
114             write_dict = {"return_vals": []}
115             for i, output_param in enumerate(output_params):
116                 key = output_param.key
117                 proto_type = output_param.proto_type
118                 output_value = return_vals[i]
119
120                 if proto_type is not None:
121                     output_value = output_value.as_dict()
122
123                 return_val = {
124                         "key": key,
125                         "value": output_value,
126                         "proto_type": proto_type,
127                         }
128
129                 write_dict["return_vals"].append(return_val)
130
131             return write_dict
132
133         @asyncio.coroutine
134         def handle_request():
135             self.log.debug("Got cloudsimproxy POST request: %s", self.request.body)
136             cal_args = body_to_cal_args()
137
138             # Execute the CAL request in a seperate thread to prevent
139             # blocking the main loop.
140             return_vals = yield from self.loop.run_in_executor(
141                     self.executor,
142                     self.wrap_status_fn,
143                     getattr(self.cal, self.cal_method),
144                     self.account,
145                     *cal_args
146                     )
147
148             return cal_return_vals(return_vals)
149
150         f = asyncio.ensure_future(handle_request(), loop=self.loop)
151         return_dict = yield tornado.platform.asyncio.to_tornado_future(f)
152
153         self.log.debug("Responding to %s RPC with %s", self.cal_method, return_dict)
154
155         self.clear()
156         self.set_status(200)
157         self.write(return_dict)
158
159
160 class CalProxyApp(tornado.web.Application):
161     def __init__(self, log, loop, cal_interface, cal_account):
162         self.log = log
163         self.loop = loop
164         self.cal = cal_interface
165         self.account = cal_account
166
167         attrs = dict(
168             log=self.log,
169             loop=self.loop,
170             cal=cal_interface,
171             account=cal_account,
172             # Create an executor with a single worker to prevent
173             # having multiple simulteneous calls into CAL (which is not threadsafe)
174             executor=concurrent.futures.ThreadPoolExecutor(1)
175             )
176
177         def mk_attrs(cal_method, input_params=None, output_params=None):
178             new_attrs = {
179                     "cal_method": cal_method,
180                     "input_params": input_params,
181                     "output_params": output_params
182                     }
183             new_attrs.update(attrs)
184
185             return new_attrs
186
187         super(CalProxyApp, self).__init__([
188             (r"/api/get_image_list", CalRequestHandler,
189                 mk_attrs(
190                     cal_method="get_image_list",
191                     output_params=[
192                         RPCParam("images", "VimResources"),
193                         ]
194                     ),
195                 ),
196
197             (r"/api/create_image", CalRequestHandler,
198                 mk_attrs(
199                     cal_method="create_image",
200                     input_params=[
201                         RPCParam("image", "ImageInfoItem"),
202                         ],
203                     output_params=[
204                         RPCParam("image_id"),
205                         ]
206                     ),
207                 ),
208
209             (r"/api/delete_image", CalRequestHandler,
210                 mk_attrs(
211                     cal_method="delete_image",
212                     input_params=[
213                         RPCParam("image_id"),
214                         ],
215                     ),
216                 ),
217
218             (r"/api/get_image", CalRequestHandler,
219                 mk_attrs(
220                     cal_method="get_image",
221                     input_params=[
222                         RPCParam("image_id"),
223                         ],
224                     output_params=[
225                         RPCParam("image", "ImageInfoItem"),
226                         ],
227                     ),
228                 ),
229
230             (r"/api/create_vm", CalRequestHandler,
231                 mk_attrs(
232                     cal_method="create_vm",
233                     input_params=[
234                         RPCParam("vm", "VMInfoItem"),
235                         ],
236                     output_params=[
237                         RPCParam("vm_id"),
238                         ],
239                     ),
240                 ),
241
242             (r"/api/start_vm", CalRequestHandler,
243                     mk_attrs(
244                         cal_method="start_vm",
245                         input_params=[
246                             RPCParam("vm_id"),
247                             ],
248                         ),
249                     ),
250
251             (r"/api/stop_vm", CalRequestHandler,
252                     mk_attrs(
253                         cal_method="stop_vm",
254                         input_params=[
255                             RPCParam("vm_id"),
256                             ],
257                         ),
258                     ),
259
260             (r"/api/delete_vm", CalRequestHandler,
261                     mk_attrs(
262                         cal_method="delete_vm",
263                         input_params=[
264                             RPCParam("vm_id"),
265                             ],
266                         ),
267                     ),
268
269             (r"/api/reboot_vm", CalRequestHandler,
270                     mk_attrs(
271                         cal_method="reboot_vm",
272                         input_params=[
273                             RPCParam("vm_id"),
274                             ],
275                         ),
276                     ),
277
278             (r"/api/get_vm_list", CalRequestHandler,
279                     mk_attrs(
280                         cal_method="get_vm_list",
281                         output_params=[
282                             RPCParam("vms", "VimResources"),
283                             ],
284                         ),
285                     ),
286
287             (r"/api/get_vm", CalRequestHandler,
288                     mk_attrs(
289                         cal_method="get_vm",
290                         input_params=[
291                             RPCParam("vm_id"),
292                             ],
293                         output_params=[
294                             RPCParam("vms", "VMInfoItem"),
295                             ],
296                         ),
297                     ),
298
299             (r"/api/create_flavor", CalRequestHandler,
300                     mk_attrs(
301                         cal_method="create_flavor",
302                         input_params=[
303                             RPCParam("flavor", "FlavorInfoItem"),
304                             ],
305                         output_params=[
306                             RPCParam("flavor_id"),
307                             ],
308                         ),
309                     ),
310
311             (r"/api/delete_flavor", CalRequestHandler,
312                     mk_attrs(
313                         cal_method="delete_flavor",
314                         input_params=[
315                             RPCParam("flavor_id"),
316                             ],
317                         ),
318                     ),
319
320             (r"/api/get_flavor_list", CalRequestHandler,
321                     mk_attrs(
322                         cal_method="get_flavor_list",
323                         output_params=[
324                             RPCParam("flavors", "VimResources"),
325                             ],
326                         ),
327                     ),
328
329             (r"/api/get_flavor", CalRequestHandler,
330                     mk_attrs(
331                         cal_method="get_flavor",
332                         input_params=[
333                             RPCParam("flavor_id"),
334                             ],
335                         output_params=[
336                             RPCParam("flavor", "FlavorInfoItem"),
337                             ],
338                         ),
339                     ),
340
341             (r"/api/create_network", CalRequestHandler,
342                     mk_attrs(
343                         cal_method="create_network",
344                         input_params=[
345                             RPCParam("network", "NetworkInfoItem"),
346                             ],
347                         output_params=[
348                             RPCParam("network_id"),
349                             ],
350                         ),
351                     ),
352
353             (r"/api/delete_network", CalRequestHandler,
354                     mk_attrs(
355                         cal_method="delete_network",
356                         input_params=[
357                             RPCParam("network_id"),
358                             ],
359                         ),
360                     ),
361
362             (r"/api/get_network", CalRequestHandler,
363                     mk_attrs(
364                         cal_method="get_network",
365                         input_params=[
366                             RPCParam("network_id"),
367                             ],
368                         output_params=[
369                             RPCParam("network", "NetworkInfoItem"),
370                             ],
371                         ),
372                     ),
373
374             (r"/api/get_network_list", CalRequestHandler,
375                     mk_attrs(
376                         cal_method="get_network_list",
377                         output_params=[
378                             RPCParam("networks", "VimResources"),
379                             ],
380                         ),
381                     ),
382
383             (r"/api/get_management_network", CalRequestHandler,
384                     mk_attrs(
385                         cal_method="get_management_network",
386                         output_params=[
387                             RPCParam("network", "NetworkInfoItem"),
388                             ],
389                         ),
390                     ),
391
392             (r"/api/create_port", CalRequestHandler,
393                     mk_attrs(
394                         cal_method="create_port",
395                         input_params=[
396                             RPCParam("port", "PortInfoItem"),
397                             ],
398                         output_params=[
399                             RPCParam("port_id"),
400                             ],
401                         ),
402                     ),
403
404             (r"/api/delete_port", CalRequestHandler,
405                     mk_attrs(
406                         cal_method="delete_port",
407                         input_params=[
408                             RPCParam("port_id"),
409                             ],
410                         ),
411                     ),
412
413             (r"/api/get_port", CalRequestHandler,
414                     mk_attrs(
415                         cal_method="get_port",
416                         input_params=[
417                             RPCParam("port_id"),
418                             ],
419                         output_params=[
420                             RPCParam("port", "PortInfoItem"),
421                             ],
422                         ),
423                     ),
424
425             (r"/api/get_port_list", CalRequestHandler,
426                     mk_attrs(
427                         cal_method="get_port_list",
428                         output_params=[
429                             RPCParam("ports", "VimResources"),
430                             ],
431                         ),
432                     ),
433
434             (r"/api/create_virtual_link", CalRequestHandler,
435                     mk_attrs(
436                         cal_method="create_virtual_link",
437                         input_params=[
438                             RPCParam("link_params", "VirtualLinkReqParams"),
439                             ],
440                         output_params=[
441                             RPCParam("link_id"),
442                             ],
443                         ),
444                     ),
445
446             (r"/api/delete_virtual_link", CalRequestHandler,
447                     mk_attrs(
448                         cal_method="delete_virtual_link",
449                         input_params=[
450                             RPCParam("link_id"),
451                             ],
452                         ),
453                     ),
454
455             (r"/api/get_virtual_link", CalRequestHandler,
456                     mk_attrs(
457                         cal_method="get_virtual_link",
458                         input_params=[
459                             RPCParam("link_id"),
460                             ],
461                         output_params=[
462                             RPCParam("response", "VirtualLinkInfoParams"),
463                             ],
464                         ),
465                     ),
466
467             (r"/api/get_virtual_link_list", CalRequestHandler,
468                     mk_attrs(
469                         cal_method="get_virtual_link_list",
470                         output_params=[
471                             RPCParam("resources", "VNFResources"),
472                             ],
473                         ),
474                     ),
475
476             (r"/api/create_vdu", CalRequestHandler,
477                     mk_attrs(
478                         cal_method="create_vdu",
479                         input_params=[
480                             RPCParam("vdu_params", "VDUInitParams"),
481                             ],
482                         output_params=[
483                             RPCParam("vdu_id"),
484                             ],
485                         ),
486                     ),
487
488             (r"/api/modify_vdu", CalRequestHandler,
489                     mk_attrs(
490                         cal_method="modify_vdu",
491                         input_params=[
492                             RPCParam("vdu_params", "VDUModifyParams"),
493                             ],
494                         ),
495                     ),
496
497             (r"/api/delete_vdu", CalRequestHandler,
498                     mk_attrs(
499                         cal_method="delete_vdu",
500                         input_params=[
501                             RPCParam("vdu_id"),
502                             ],
503                         ),
504                     ),
505
506             (r"/api/get_vdu", CalRequestHandler,
507                     mk_attrs(
508                         cal_method="get_vdu",
509                         input_params=[
510                             RPCParam("vdu_id"),
511                             ],
512                         output_params=[
513                             RPCParam("response", "VDUInfoParams"),
514                             ],
515                         ),
516                     ),
517
518             (r"/api/get_vdu_list", CalRequestHandler,
519                     mk_attrs(
520                         cal_method="get_vdu_list",
521                         output_params=[
522                             RPCParam("resources", "VNFResources"),
523                             ],
524                         ),
525                     )
526             ])
527
528
529 class RwCalProxyTasklet(rift.tasklets.Tasklet):
530     HTTP_PORT = 9002
531     cal_interface = None
532
533     def __init__(self, *args, **kwargs):
534         super().__init__(*args, **kwargs)
535
536         self.app = None
537         self.server = None
538
539     def get_cal_interface(self):
540         if RwCalProxyTasklet.cal_interface is None:
541             plugin = rw_peas.PeasPlugin('rwcal_cloudsim', 'RwCal-1.0')
542             engine, info, extension = plugin()
543
544             RwCalProxyTasklet.cal_interface = plugin.get_interface("Cloud")
545             RwCalProxyTasklet.cal_interface.init(self.log_hdl)
546
547         return RwCalProxyTasklet.cal_interface
548
549     def start(self):
550         """Tasklet entry point"""
551         self.log.setLevel(logging.DEBUG)
552
553         super().start()
554
555         cal = self.get_cal_interface()
556         account = RwcalYang.CloudAccount(account_type="cloudsim")
557
558         self.app = CalProxyApp(self.log, self.loop, cal, account)
559         self._dts = rift.tasklets.DTS(
560                 self.tasklet_info,
561                 RwcalYang.get_schema(),
562                 self.loop,
563                 self.on_dts_state_change
564                 )
565
566         io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
567         self.server = tornado.httpserver.HTTPServer(
568                 self.app,
569                 io_loop=io_loop,
570                 )
571
572         self.log.info("Starting Cal Proxy Http Server on port %s",
573                       RwCalProxyTasklet.HTTP_PORT)
574         self.server.listen(RwCalProxyTasklet.HTTP_PORT)
575
576     def stop(self):
577       try:
578          self.server.stop()
579          self._dts.deinit()
580       except Exception:
581          print("Caught Exception in LP stop:", sys.exc_info()[0])
582          raise
583
584     @asyncio.coroutine
585     def init(self):
586         pass
587
588     @asyncio.coroutine
589     def run(self):
590         pass
591
592     @asyncio.coroutine
593     def on_dts_state_change(self, state):
594         """Take action according to current dts state to transition
595         application into the corresponding application state
596
597         Arguments
598             state - current dts state
599         """
600
601         switch = {
602             rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
603             rwdts.State.CONFIG: rwdts.State.RUN,
604         }
605
606         handlers = {
607             rwdts.State.INIT: self.init,
608             rwdts.State.RUN: self.run,
609         }
610
611         # Transition application to next state
612         handler = handlers.get(state, None)
613         if handler is not None:
614             yield from handler()
615
616         # Transition dts to next state
617         next_state = switch.get(state, None)
618         if next_state is not None:
619             self._dts.handle.set_state(next_state)