3 # (c) Copyright RIFT.io, 2013-2016, All Rights Reserved
6 @file rwcalproxytasklet.py
7 @author Austin Cormier(austin.cormier@riftio.com)
13 import concurrent.futures
19 import tornado.httpserver
21 import tornado.platform.asyncio
24 gi.require_version('RwDts', '1.0')
25 gi.require_version('RwcalYang', '1.0')
26 gi.require_version('RwTypes', '1.0')
27 from gi.repository import (
36 if sys.version_info < (3, 4, 4):
37 asyncio.ensure_future = asyncio.async
40 class CalCallFailure(Exception):
44 class RPCParam(object):
45 def __init__(self, key, proto_type=None):
47 self.proto_type = proto_type
50 class CalRequestHandler(tornado.web.RequestHandler):
51 def initialize(self, log, loop, cal, account, executor, cal_method,
52 input_params=None, output_params=None):
56 self.account = account
57 self.executor = executor
58 self.cal_method = cal_method
59 self.input_params = input_params
60 self.output_params = output_params
62 def wrap_status_fn(self, fn, *args, **kwargs):
63 ret = fn(*args, **kwargs)
64 if not isinstance(ret, collections.Iterable):
68 if type(rw_status) != RwTypes.RwStatus:
69 raise ValueError("First return value of %s function was not a RwStatus" %
72 if rw_status != RwTypes.RwStatus.SUCCESS:
73 msg = "%s returned %s" % (fn.__name__, str(rw_status))
75 raise CalCallFailure(msg)
79 @tornado.gen.coroutine
81 def body_to_cal_args():
83 if self.input_params is None:
86 input_dict = tornado.escape.json_decode(self.request.body)
87 if len(input_dict) != len(self.input_params):
88 raise ValueError("Got %s parameters, expected %s" %
89 (len(input_dict), len(self.input_params)))
91 for input_param in self.input_params:
93 value = input_dict[key]
94 proto_type = input_param.proto_type
96 if proto_type is not None:
97 proto_cls = getattr(RwcalYang, proto_type)
98 self.log.debug("Deserializing into %s type", proto_cls)
99 value = proto_cls.from_dict(value)
101 cal_args.append(value)
105 def cal_return_vals(return_vals):
106 output_params = self.output_params
107 if output_params is None:
110 if len(return_vals) != len(output_params):
111 raise ValueError("Got %s return values. Expected %s",
112 len(return_vals), len(output_params))
114 write_dict = {"return_vals": []}
115 for i, output_param in enumerate(output_params):
116 key = output_param.key
117 proto_type = output_param.proto_type
118 output_value = return_vals[i]
120 if proto_type is not None:
121 output_value = output_value.as_dict()
125 "value": output_value,
126 "proto_type": proto_type,
129 write_dict["return_vals"].append(return_val)
134 def handle_request():
135 self.log.debug("Got cloudsimproxy POST request: %s", self.request.body)
136 cal_args = body_to_cal_args()
138 # Execute the CAL request in a seperate thread to prevent
139 # blocking the main loop.
140 return_vals = yield from self.loop.run_in_executor(
143 getattr(self.cal, self.cal_method),
148 return cal_return_vals(return_vals)
150 f = asyncio.ensure_future(handle_request(), loop=self.loop)
151 return_dict = yield tornado.platform.asyncio.to_tornado_future(f)
153 self.log.debug("Responding to %s RPC with %s", self.cal_method, return_dict)
157 self.write(return_dict)
160 class CalProxyApp(tornado.web.Application):
161 def __init__(self, log, loop, cal_interface, cal_account):
164 self.cal = cal_interface
165 self.account = cal_account
172 # Create an executor with a single worker to prevent
173 # having multiple simulteneous calls into CAL (which is not threadsafe)
174 executor=concurrent.futures.ThreadPoolExecutor(1)
177 def mk_attrs(cal_method, input_params=None, output_params=None):
179 "cal_method": cal_method,
180 "input_params": input_params,
181 "output_params": output_params
183 new_attrs.update(attrs)
187 super(CalProxyApp, self).__init__([
188 (r"/api/get_image_list", CalRequestHandler,
190 cal_method="get_image_list",
192 RPCParam("images", "VimResources"),
197 (r"/api/create_image", CalRequestHandler,
199 cal_method="create_image",
201 RPCParam("image", "ImageInfoItem"),
204 RPCParam("image_id"),
209 (r"/api/delete_image", CalRequestHandler,
211 cal_method="delete_image",
213 RPCParam("image_id"),
218 (r"/api/get_image", CalRequestHandler,
220 cal_method="get_image",
222 RPCParam("image_id"),
225 RPCParam("image", "ImageInfoItem"),
230 (r"/api/create_vm", CalRequestHandler,
232 cal_method="create_vm",
234 RPCParam("vm", "VMInfoItem"),
242 (r"/api/start_vm", CalRequestHandler,
244 cal_method="start_vm",
251 (r"/api/stop_vm", CalRequestHandler,
253 cal_method="stop_vm",
260 (r"/api/delete_vm", CalRequestHandler,
262 cal_method="delete_vm",
269 (r"/api/reboot_vm", CalRequestHandler,
271 cal_method="reboot_vm",
278 (r"/api/get_vm_list", CalRequestHandler,
280 cal_method="get_vm_list",
282 RPCParam("vms", "VimResources"),
287 (r"/api/get_vm", CalRequestHandler,
294 RPCParam("vms", "VMInfoItem"),
299 (r"/api/create_flavor", CalRequestHandler,
301 cal_method="create_flavor",
303 RPCParam("flavor", "FlavorInfoItem"),
306 RPCParam("flavor_id"),
311 (r"/api/delete_flavor", CalRequestHandler,
313 cal_method="delete_flavor",
315 RPCParam("flavor_id"),
320 (r"/api/get_flavor_list", CalRequestHandler,
322 cal_method="get_flavor_list",
324 RPCParam("flavors", "VimResources"),
329 (r"/api/get_flavor", CalRequestHandler,
331 cal_method="get_flavor",
333 RPCParam("flavor_id"),
336 RPCParam("flavor", "FlavorInfoItem"),
341 (r"/api/create_network", CalRequestHandler,
343 cal_method="create_network",
345 RPCParam("network", "NetworkInfoItem"),
348 RPCParam("network_id"),
353 (r"/api/delete_network", CalRequestHandler,
355 cal_method="delete_network",
357 RPCParam("network_id"),
362 (r"/api/get_network", CalRequestHandler,
364 cal_method="get_network",
366 RPCParam("network_id"),
369 RPCParam("network", "NetworkInfoItem"),
374 (r"/api/get_network_list", CalRequestHandler,
376 cal_method="get_network_list",
378 RPCParam("networks", "VimResources"),
383 (r"/api/get_management_network", CalRequestHandler,
385 cal_method="get_management_network",
387 RPCParam("network", "NetworkInfoItem"),
392 (r"/api/create_port", CalRequestHandler,
394 cal_method="create_port",
396 RPCParam("port", "PortInfoItem"),
404 (r"/api/delete_port", CalRequestHandler,
406 cal_method="delete_port",
413 (r"/api/get_port", CalRequestHandler,
415 cal_method="get_port",
420 RPCParam("port", "PortInfoItem"),
425 (r"/api/get_port_list", CalRequestHandler,
427 cal_method="get_port_list",
429 RPCParam("ports", "VimResources"),
434 (r"/api/create_virtual_link", CalRequestHandler,
436 cal_method="create_virtual_link",
438 RPCParam("link_params", "VirtualLinkReqParams"),
446 (r"/api/delete_virtual_link", CalRequestHandler,
448 cal_method="delete_virtual_link",
455 (r"/api/get_virtual_link", CalRequestHandler,
457 cal_method="get_virtual_link",
462 RPCParam("response", "VirtualLinkInfoParams"),
467 (r"/api/get_virtual_link_list", CalRequestHandler,
469 cal_method="get_virtual_link_list",
471 RPCParam("resources", "VNFResources"),
476 (r"/api/create_vdu", CalRequestHandler,
478 cal_method="create_vdu",
480 RPCParam("vdu_params", "VDUInitParams"),
488 (r"/api/modify_vdu", CalRequestHandler,
490 cal_method="modify_vdu",
492 RPCParam("vdu_params", "VDUModifyParams"),
497 (r"/api/delete_vdu", CalRequestHandler,
499 cal_method="delete_vdu",
506 (r"/api/get_vdu", CalRequestHandler,
508 cal_method="get_vdu",
513 RPCParam("response", "VDUInfoParams"),
518 (r"/api/get_vdu_list", CalRequestHandler,
520 cal_method="get_vdu_list",
522 RPCParam("resources", "VNFResources"),
529 class RwCalProxyTasklet(rift.tasklets.Tasklet):
533 def __init__(self, *args, **kwargs):
534 super().__init__(*args, **kwargs)
539 def get_cal_interface(self):
540 if RwCalProxyTasklet.cal_interface is None:
541 plugin = rw_peas.PeasPlugin('rwcal_cloudsim', 'RwCal-1.0')
542 engine, info, extension = plugin()
544 RwCalProxyTasklet.cal_interface = plugin.get_interface("Cloud")
545 RwCalProxyTasklet.cal_interface.init(self.log_hdl)
547 return RwCalProxyTasklet.cal_interface
550 """Tasklet entry point"""
551 self.log.setLevel(logging.DEBUG)
555 cal = self.get_cal_interface()
556 account = RwcalYang.CloudAccount(account_type="cloudsim")
558 self.app = CalProxyApp(self.log, self.loop, cal, account)
559 self._dts = rift.tasklets.DTS(
561 RwcalYang.get_schema(),
563 self.on_dts_state_change
566 io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
567 self.server = tornado.httpserver.HTTPServer(
572 self.log.info("Starting Cal Proxy Http Server on port %s",
573 RwCalProxyTasklet.HTTP_PORT)
574 self.server.listen(RwCalProxyTasklet.HTTP_PORT)
581 print("Caught Exception in LP stop:", sys.exc_info()[0])
593 def on_dts_state_change(self, state):
594 """Take action according to current dts state to transition
595 application into the corresponding application state
598 state - current dts state
602 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
603 rwdts.State.CONFIG: rwdts.State.RUN,
607 rwdts.State.INIT: self.init,
608 rwdts.State.RUN: self.run,
611 # Transition application to next state
612 handler = handlers.get(state, None)
613 if handler is not None:
616 # Transition dts to next state
617 next_state = switch.get(state, None)
618 if next_state is not None:
619 self._dts.handle.set_state(next_state)