3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 @file rwcalproxytasklet.py
19 @author Austin Cormier(austin.cormier@riftio.com)
25 import concurrent
.futures
31 import tornado
.httpserver
33 import tornado
.platform
.asyncio
36 gi
.require_version('RwDts', '1.0')
37 gi
.require_version('RwcalYang', '1.0')
38 gi
.require_version('RwTypes', '1.0')
39 gi
.require_version('RwCal', '1.0')
41 from gi
.repository
import (
50 if sys
.version_info
< (3, 4, 4):
51 asyncio
.ensure_future
= asyncio
.async
54 class CalCallFailure(Exception):
58 class RPCParam(object):
59 def __init__(self
, key
, proto_type
=None):
61 self
.proto_type
= proto_type
64 class CalRequestHandler(tornado
.web
.RequestHandler
):
65 def initialize(self
, log
, loop
, cal
, account
, executor
, cal_method
,
66 input_params
=None, output_params
=None):
70 self
.account
= account
71 self
.executor
= executor
72 self
.cal_method
= cal_method
73 self
.input_params
= input_params
74 self
.output_params
= output_params
76 def wrap_status_fn(self
, fn
, *args
, **kwargs
):
77 ret
= fn(*args
, **kwargs
)
78 if not isinstance(ret
, collections
.Iterable
):
82 if type(rw_status
) != RwTypes
.RwStatus
:
83 raise ValueError("First return value of %s function was not a RwStatus" %
86 if rw_status
!= RwTypes
.RwStatus
.SUCCESS
:
87 msg
= "%s returned %s" % (fn
.__name
__, str(rw_status
))
89 raise CalCallFailure(msg
)
93 @tornado.gen
.coroutine
95 def body_to_cal_args():
97 if self
.input_params
is None:
100 input_dict
= tornado
.escape
.json_decode(self
.request
.body
)
101 if len(input_dict
) != len(self
.input_params
):
102 raise ValueError("Got %s parameters, expected %s" %
103 (len(input_dict
), len(self
.input_params
)))
105 for input_param
in self
.input_params
:
106 key
= input_param
.key
107 value
= input_dict
[key
]
108 proto_type
= input_param
.proto_type
110 if proto_type
is not None:
111 proto_cls
= getattr(RwcalYang
, proto_type
)
112 self
.log
.debug("Deserializing into %s type", proto_cls
)
113 value
= proto_cls
.from_dict(value
)
115 cal_args
.append(value
)
119 def cal_return_vals(return_vals
):
120 output_params
= self
.output_params
121 if output_params
is None:
124 if len(return_vals
) != len(output_params
):
125 raise ValueError("Got %s return values. Expected %s",
126 len(return_vals
), len(output_params
))
128 write_dict
= {"return_vals": []}
129 for i
, output_param
in enumerate(output_params
):
130 key
= output_param
.key
131 proto_type
= output_param
.proto_type
132 output_value
= return_vals
[i
]
134 if proto_type
is not None:
135 output_value
= output_value
.as_dict()
139 "value": output_value
,
140 "proto_type": proto_type
,
143 write_dict
["return_vals"].append(return_val
)
148 def handle_request():
149 self
.log
.debug("Got cloudsimproxy POST request: %s", self
.request
.body
)
150 cal_args
= body_to_cal_args()
152 # Execute the CAL request in a seperate thread to prevent
153 # blocking the main loop.
154 return_vals
= yield from self
.loop
.run_in_executor(
157 getattr(self
.cal
, self
.cal_method
),
162 return cal_return_vals(return_vals
)
164 f
= asyncio
.ensure_future(handle_request(), loop
=self
.loop
)
165 return_dict
= yield tornado
.platform
.asyncio
.to_tornado_future(f
)
167 self
.log
.debug("Responding to %s RPC with %s", self
.cal_method
, return_dict
)
171 self
.write(return_dict
)
174 class CalProxyApp(tornado
.web
.Application
):
175 def __init__(self
, log
, loop
, cal_interface
, cal_account
):
178 self
.cal
= cal_interface
179 self
.account
= cal_account
186 # Create an executor with a single worker to prevent
187 # having multiple simulteneous calls into CAL (which is not threadsafe)
188 executor
=concurrent
.futures
.ThreadPoolExecutor(1)
191 def mk_attrs(cal_method
, input_params
=None, output_params
=None):
193 "cal_method": cal_method
,
194 "input_params": input_params
,
195 "output_params": output_params
197 new_attrs
.update(attrs
)
201 super(CalProxyApp
, self
).__init
__([
202 (r
"/api/get_image_list", CalRequestHandler
,
204 cal_method
="get_image_list",
206 RPCParam("images", "VimResources"),
211 (r
"/api/create_image", CalRequestHandler
,
213 cal_method
="create_image",
215 RPCParam("image", "ImageInfoItem"),
218 RPCParam("image_id"),
223 (r
"/api/delete_image", CalRequestHandler
,
225 cal_method
="delete_image",
227 RPCParam("image_id"),
232 (r
"/api/get_image", CalRequestHandler
,
234 cal_method
="get_image",
236 RPCParam("image_id"),
239 RPCParam("image", "ImageInfoItem"),
244 (r
"/api/create_vm", CalRequestHandler
,
246 cal_method
="create_vm",
248 RPCParam("vm", "VMInfoItem"),
256 (r
"/api/start_vm", CalRequestHandler
,
258 cal_method
="start_vm",
265 (r
"/api/stop_vm", CalRequestHandler
,
267 cal_method
="stop_vm",
274 (r
"/api/delete_vm", CalRequestHandler
,
276 cal_method
="delete_vm",
283 (r
"/api/reboot_vm", CalRequestHandler
,
285 cal_method
="reboot_vm",
292 (r
"/api/get_vm_list", CalRequestHandler
,
294 cal_method
="get_vm_list",
296 RPCParam("vms", "VimResources"),
301 (r
"/api/get_vm", CalRequestHandler
,
308 RPCParam("vms", "VMInfoItem"),
313 (r
"/api/create_flavor", CalRequestHandler
,
315 cal_method
="create_flavor",
317 RPCParam("flavor", "FlavorInfoItem"),
320 RPCParam("flavor_id"),
325 (r
"/api/delete_flavor", CalRequestHandler
,
327 cal_method
="delete_flavor",
329 RPCParam("flavor_id"),
334 (r
"/api/get_flavor_list", CalRequestHandler
,
336 cal_method
="get_flavor_list",
338 RPCParam("flavors", "VimResources"),
343 (r
"/api/get_flavor", CalRequestHandler
,
345 cal_method
="get_flavor",
347 RPCParam("flavor_id"),
350 RPCParam("flavor", "FlavorInfoItem"),
355 (r
"/api/create_network", CalRequestHandler
,
357 cal_method
="create_network",
359 RPCParam("network", "NetworkInfoItem"),
362 RPCParam("network_id"),
367 (r
"/api/delete_network", CalRequestHandler
,
369 cal_method
="delete_network",
371 RPCParam("network_id"),
376 (r
"/api/get_network", CalRequestHandler
,
378 cal_method
="get_network",
380 RPCParam("network_id"),
383 RPCParam("network", "NetworkInfoItem"),
388 (r
"/api/get_network_list", CalRequestHandler
,
390 cal_method
="get_network_list",
392 RPCParam("networks", "VimResources"),
397 (r
"/api/get_management_network", CalRequestHandler
,
399 cal_method
="get_management_network",
401 RPCParam("network", "NetworkInfoItem"),
406 (r
"/api/create_port", CalRequestHandler
,
408 cal_method
="create_port",
410 RPCParam("port", "PortInfoItem"),
418 (r
"/api/delete_port", CalRequestHandler
,
420 cal_method
="delete_port",
427 (r
"/api/get_port", CalRequestHandler
,
429 cal_method
="get_port",
434 RPCParam("port", "PortInfoItem"),
439 (r
"/api/get_port_list", CalRequestHandler
,
441 cal_method
="get_port_list",
443 RPCParam("ports", "VimResources"),
448 (r
"/api/create_virtual_link", CalRequestHandler
,
450 cal_method
="create_virtual_link",
452 RPCParam("link_params", "VirtualLinkReqParams"),
460 (r
"/api/delete_virtual_link", CalRequestHandler
,
462 cal_method
="delete_virtual_link",
469 (r
"/api/get_virtual_link", CalRequestHandler
,
471 cal_method
="get_virtual_link",
476 RPCParam("response", "VirtualLinkInfoParams"),
481 (r
"/api/get_virtual_link_list", CalRequestHandler
,
483 cal_method
="get_virtual_link_list",
485 RPCParam("resources", "VNFResources"),
490 (r
"/api/create_vdu", CalRequestHandler
,
492 cal_method
="create_vdu",
494 RPCParam("vdu_params", "VDUInitParams"),
502 (r
"/api/modify_vdu", CalRequestHandler
,
504 cal_method
="modify_vdu",
506 RPCParam("vdu_params", "VDUModifyParams"),
511 (r
"/api/delete_vdu", CalRequestHandler
,
513 cal_method
="delete_vdu",
520 (r
"/api/get_vdu", CalRequestHandler
,
522 cal_method
="get_vdu",
527 RPCParam("response", "VDUInfoParams"),
532 (r
"/api/get_vdu_list", CalRequestHandler
,
534 cal_method
="get_vdu_list",
536 RPCParam("resources", "VNFResources"),
543 class RwCalProxyTasklet(rift
.tasklets
.Tasklet
):
547 def __init__(self
, *args
, **kwargs
):
548 super().__init
__(*args
, **kwargs
)
553 def get_cal_interface(self
):
554 if RwCalProxyTasklet
.cal_interface
is None:
555 plugin
= rw_peas
.PeasPlugin('rwcal_cloudsim', 'RwCal-1.0')
556 engine
, info
, extension
= plugin()
558 RwCalProxyTasklet
.cal_interface
= plugin
.get_interface("Cloud")
559 RwCalProxyTasklet
.cal_interface
.init(self
.log_hdl
)
561 return RwCalProxyTasklet
.cal_interface
564 """Tasklet entry point"""
565 self
.log
.setLevel(logging
.DEBUG
)
569 cal
= self
.get_cal_interface()
570 account
= RwcalYang
.CloudAccount(account_type
="cloudsim")
572 self
.app
= CalProxyApp(self
.log
, self
.loop
, cal
, account
)
573 self
._dts
= rift
.tasklets
.DTS(
575 RwcalYang
.get_schema(),
577 self
.on_dts_state_change
580 io_loop
= rift
.tasklets
.tornado
.TaskletAsyncIOLoop(asyncio_loop
=self
.loop
)
581 self
.server
= tornado
.httpserver
.HTTPServer(
586 self
.log
.info("Starting Cal Proxy Http Server on port %s",
587 RwCalProxyTasklet
.HTTP_PORT
)
588 self
.server
.listen(RwCalProxyTasklet
.HTTP_PORT
)
595 print("Caught Exception in LP stop:", sys
.exc_info()[0])
607 def on_dts_state_change(self
, state
):
608 """Take action according to current dts state to transition
609 application into the corresponding application state
612 state - current dts state
616 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
617 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
621 rwdts
.State
.INIT
: self
.init
,
622 rwdts
.State
.RUN
: self
.run
,
625 # Transition application to next state
626 handler
= handlers
.get(state
, None)
627 if handler
is not None:
630 # Transition dts to next state
631 next_state
= switch
.get(state
, None)
632 if next_state
is not None:
633 self
._dts
.handle
.set_state(next_state
)