bb2c355c1823bd81a82679a2c7b50305a97b7301
[osm/SO.git] / rwcal / plugins / rwcalproxytasklet / rift / tasklets / rwcalproxytasklet / rwcalproxytasklet.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file rwcalproxytasklet.py
19 @author Austin Cormier(austin.cormier@riftio.com)
20 @date 2015-10-20
21 """
22
23 import asyncio
24 import collections
25 import concurrent.futures
26 import logging
27 import os
28 import sys
29
30 import tornado
31 import tornado.httpserver
32 import tornado.web
33 import tornado.platform.asyncio
34
35 import gi
36 gi.require_version('RwDts', '1.0')
37 gi.require_version('RwcalYang', '1.0')
38 gi.require_version('RwTypes', '1.0')
39 gi.require_version('RwCal', '1.0')
40
41 from gi.repository import (
42 RwDts as rwdts,
43 RwcalYang,
44 RwTypes,
45 )
46
47 import rw_peas
48 import rift.tasklets
49
50 if sys.version_info < (3, 4, 4):
51 asyncio.ensure_future = asyncio.async
52
53
54 class CalCallFailure(Exception):
55 pass
56
57
58 class RPCParam(object):
59 def __init__(self, key, proto_type=None):
60 self.key = key
61 self.proto_type = proto_type
62
63
64 class CalRequestHandler(tornado.web.RequestHandler):
65 def initialize(self, log, loop, cal, account, executor, cal_method,
66 input_params=None, output_params=None):
67 self.log = log
68 self.loop = loop
69 self.cal = cal
70 self.account = account
71 self.executor = executor
72 self.cal_method = cal_method
73 self.input_params = input_params
74 self.output_params = output_params
75
76 def wrap_status_fn(self, fn, *args, **kwargs):
77 ret = fn(*args, **kwargs)
78 if not isinstance(ret, collections.Iterable):
79 ret = [ret]
80
81 rw_status = ret[0]
82 if type(rw_status) != RwTypes.RwStatus:
83 raise ValueError("First return value of %s function was not a RwStatus" %
84 fn.__name__)
85
86 if rw_status != RwTypes.RwStatus.SUCCESS:
87 msg = "%s returned %s" % (fn.__name__, str(rw_status))
88 self.log.error(msg)
89 raise CalCallFailure(msg)
90
91 return ret[1:]
92
93 @tornado.gen.coroutine
94 def post(self):
95 def body_to_cal_args():
96 cal_args = []
97 if self.input_params is None:
98 return cal_args
99
100 input_dict = tornado.escape.json_decode(self.request.body)
101 if len(input_dict) != len(self.input_params):
102 raise ValueError("Got %s parameters, expected %s" %
103 (len(input_dict), len(self.input_params)))
104
105 for input_param in self.input_params:
106 key = input_param.key
107 value = input_dict[key]
108 proto_type = input_param.proto_type
109
110 if proto_type is not None:
111 proto_cls = getattr(RwcalYang, proto_type)
112 self.log.debug("Deserializing into %s type", proto_cls)
113 value = proto_cls.from_dict(value)
114
115 cal_args.append(value)
116
117 return cal_args
118
119 def cal_return_vals(return_vals):
120 output_params = self.output_params
121 if output_params is None:
122 output_params = []
123
124 if len(return_vals) != len(output_params):
125 raise ValueError("Got %s return values. Expected %s",
126 len(return_vals), len(output_params))
127
128 write_dict = {"return_vals": []}
129 for i, output_param in enumerate(output_params):
130 key = output_param.key
131 proto_type = output_param.proto_type
132 output_value = return_vals[i]
133
134 if proto_type is not None:
135 output_value = output_value.as_dict()
136
137 return_val = {
138 "key": key,
139 "value": output_value,
140 "proto_type": proto_type,
141 }
142
143 write_dict["return_vals"].append(return_val)
144
145 return write_dict
146
147 @asyncio.coroutine
148 def handle_request():
149 self.log.debug("Got cloudsimproxy POST request: %s", self.request.body)
150 cal_args = body_to_cal_args()
151
152 # Execute the CAL request in a seperate thread to prevent
153 # blocking the main loop.
154 return_vals = yield from self.loop.run_in_executor(
155 self.executor,
156 self.wrap_status_fn,
157 getattr(self.cal, self.cal_method),
158 self.account,
159 *cal_args
160 )
161
162 return cal_return_vals(return_vals)
163
164 f = asyncio.ensure_future(handle_request(), loop=self.loop)
165 return_dict = yield tornado.platform.asyncio.to_tornado_future(f)
166
167 self.log.debug("Responding to %s RPC with %s", self.cal_method, return_dict)
168
169 self.clear()
170 self.set_status(200)
171 self.write(return_dict)
172
173
174 class CalProxyApp(tornado.web.Application):
175 def __init__(self, log, loop, cal_interface, cal_account):
176 self.log = log
177 self.loop = loop
178 self.cal = cal_interface
179 self.account = cal_account
180
181 attrs = dict(
182 log=self.log,
183 loop=self.loop,
184 cal=cal_interface,
185 account=cal_account,
186 # Create an executor with a single worker to prevent
187 # having multiple simulteneous calls into CAL (which is not threadsafe)
188 executor=concurrent.futures.ThreadPoolExecutor(1)
189 )
190
191 def mk_attrs(cal_method, input_params=None, output_params=None):
192 new_attrs = {
193 "cal_method": cal_method,
194 "input_params": input_params,
195 "output_params": output_params
196 }
197 new_attrs.update(attrs)
198
199 return new_attrs
200
201 super(CalProxyApp, self).__init__([
202 (r"/api/get_image_list", CalRequestHandler,
203 mk_attrs(
204 cal_method="get_image_list",
205 output_params=[
206 RPCParam("images", "VimResources"),
207 ]
208 ),
209 ),
210
211 (r"/api/create_image", CalRequestHandler,
212 mk_attrs(
213 cal_method="create_image",
214 input_params=[
215 RPCParam("image", "ImageInfoItem"),
216 ],
217 output_params=[
218 RPCParam("image_id"),
219 ]
220 ),
221 ),
222
223 (r"/api/delete_image", CalRequestHandler,
224 mk_attrs(
225 cal_method="delete_image",
226 input_params=[
227 RPCParam("image_id"),
228 ],
229 ),
230 ),
231
232 (r"/api/get_image", CalRequestHandler,
233 mk_attrs(
234 cal_method="get_image",
235 input_params=[
236 RPCParam("image_id"),
237 ],
238 output_params=[
239 RPCParam("image", "ImageInfoItem"),
240 ],
241 ),
242 ),
243
244 (r"/api/create_vm", CalRequestHandler,
245 mk_attrs(
246 cal_method="create_vm",
247 input_params=[
248 RPCParam("vm", "VMInfoItem"),
249 ],
250 output_params=[
251 RPCParam("vm_id"),
252 ],
253 ),
254 ),
255
256 (r"/api/start_vm", CalRequestHandler,
257 mk_attrs(
258 cal_method="start_vm",
259 input_params=[
260 RPCParam("vm_id"),
261 ],
262 ),
263 ),
264
265 (r"/api/stop_vm", CalRequestHandler,
266 mk_attrs(
267 cal_method="stop_vm",
268 input_params=[
269 RPCParam("vm_id"),
270 ],
271 ),
272 ),
273
274 (r"/api/delete_vm", CalRequestHandler,
275 mk_attrs(
276 cal_method="delete_vm",
277 input_params=[
278 RPCParam("vm_id"),
279 ],
280 ),
281 ),
282
283 (r"/api/reboot_vm", CalRequestHandler,
284 mk_attrs(
285 cal_method="reboot_vm",
286 input_params=[
287 RPCParam("vm_id"),
288 ],
289 ),
290 ),
291
292 (r"/api/get_vm_list", CalRequestHandler,
293 mk_attrs(
294 cal_method="get_vm_list",
295 output_params=[
296 RPCParam("vms", "VimResources"),
297 ],
298 ),
299 ),
300
301 (r"/api/get_vm", CalRequestHandler,
302 mk_attrs(
303 cal_method="get_vm",
304 input_params=[
305 RPCParam("vm_id"),
306 ],
307 output_params=[
308 RPCParam("vms", "VMInfoItem"),
309 ],
310 ),
311 ),
312
313 (r"/api/create_flavor", CalRequestHandler,
314 mk_attrs(
315 cal_method="create_flavor",
316 input_params=[
317 RPCParam("flavor", "FlavorInfoItem"),
318 ],
319 output_params=[
320 RPCParam("flavor_id"),
321 ],
322 ),
323 ),
324
325 (r"/api/delete_flavor", CalRequestHandler,
326 mk_attrs(
327 cal_method="delete_flavor",
328 input_params=[
329 RPCParam("flavor_id"),
330 ],
331 ),
332 ),
333
334 (r"/api/get_flavor_list", CalRequestHandler,
335 mk_attrs(
336 cal_method="get_flavor_list",
337 output_params=[
338 RPCParam("flavors", "VimResources"),
339 ],
340 ),
341 ),
342
343 (r"/api/get_flavor", CalRequestHandler,
344 mk_attrs(
345 cal_method="get_flavor",
346 input_params=[
347 RPCParam("flavor_id"),
348 ],
349 output_params=[
350 RPCParam("flavor", "FlavorInfoItem"),
351 ],
352 ),
353 ),
354
355 (r"/api/create_network", CalRequestHandler,
356 mk_attrs(
357 cal_method="create_network",
358 input_params=[
359 RPCParam("network", "NetworkInfoItem"),
360 ],
361 output_params=[
362 RPCParam("network_id"),
363 ],
364 ),
365 ),
366
367 (r"/api/delete_network", CalRequestHandler,
368 mk_attrs(
369 cal_method="delete_network",
370 input_params=[
371 RPCParam("network_id"),
372 ],
373 ),
374 ),
375
376 (r"/api/get_network", CalRequestHandler,
377 mk_attrs(
378 cal_method="get_network",
379 input_params=[
380 RPCParam("network_id"),
381 ],
382 output_params=[
383 RPCParam("network", "NetworkInfoItem"),
384 ],
385 ),
386 ),
387
388 (r"/api/get_network_list", CalRequestHandler,
389 mk_attrs(
390 cal_method="get_network_list",
391 output_params=[
392 RPCParam("networks", "VimResources"),
393 ],
394 ),
395 ),
396
397 (r"/api/get_management_network", CalRequestHandler,
398 mk_attrs(
399 cal_method="get_management_network",
400 output_params=[
401 RPCParam("network", "NetworkInfoItem"),
402 ],
403 ),
404 ),
405
406 (r"/api/create_port", CalRequestHandler,
407 mk_attrs(
408 cal_method="create_port",
409 input_params=[
410 RPCParam("port", "PortInfoItem"),
411 ],
412 output_params=[
413 RPCParam("port_id"),
414 ],
415 ),
416 ),
417
418 (r"/api/delete_port", CalRequestHandler,
419 mk_attrs(
420 cal_method="delete_port",
421 input_params=[
422 RPCParam("port_id"),
423 ],
424 ),
425 ),
426
427 (r"/api/get_port", CalRequestHandler,
428 mk_attrs(
429 cal_method="get_port",
430 input_params=[
431 RPCParam("port_id"),
432 ],
433 output_params=[
434 RPCParam("port", "PortInfoItem"),
435 ],
436 ),
437 ),
438
439 (r"/api/get_port_list", CalRequestHandler,
440 mk_attrs(
441 cal_method="get_port_list",
442 output_params=[
443 RPCParam("ports", "VimResources"),
444 ],
445 ),
446 ),
447
448 (r"/api/create_virtual_link", CalRequestHandler,
449 mk_attrs(
450 cal_method="create_virtual_link",
451 input_params=[
452 RPCParam("link_params", "VirtualLinkReqParams"),
453 ],
454 output_params=[
455 RPCParam("link_id"),
456 ],
457 ),
458 ),
459
460 (r"/api/delete_virtual_link", CalRequestHandler,
461 mk_attrs(
462 cal_method="delete_virtual_link",
463 input_params=[
464 RPCParam("link_id"),
465 ],
466 ),
467 ),
468
469 (r"/api/get_virtual_link", CalRequestHandler,
470 mk_attrs(
471 cal_method="get_virtual_link",
472 input_params=[
473 RPCParam("link_id"),
474 ],
475 output_params=[
476 RPCParam("response", "VirtualLinkInfoParams"),
477 ],
478 ),
479 ),
480
481 (r"/api/get_virtual_link_list", CalRequestHandler,
482 mk_attrs(
483 cal_method="get_virtual_link_list",
484 output_params=[
485 RPCParam("resources", "VNFResources"),
486 ],
487 ),
488 ),
489
490 (r"/api/create_vdu", CalRequestHandler,
491 mk_attrs(
492 cal_method="create_vdu",
493 input_params=[
494 RPCParam("vdu_params", "VDUInitParams"),
495 ],
496 output_params=[
497 RPCParam("vdu_id"),
498 ],
499 ),
500 ),
501
502 (r"/api/modify_vdu", CalRequestHandler,
503 mk_attrs(
504 cal_method="modify_vdu",
505 input_params=[
506 RPCParam("vdu_params", "VDUModifyParams"),
507 ],
508 ),
509 ),
510
511 (r"/api/delete_vdu", CalRequestHandler,
512 mk_attrs(
513 cal_method="delete_vdu",
514 input_params=[
515 RPCParam("vdu_id"),
516 ],
517 ),
518 ),
519
520 (r"/api/get_vdu", CalRequestHandler,
521 mk_attrs(
522 cal_method="get_vdu",
523 input_params=[
524 RPCParam("vdu_id"),
525 ],
526 output_params=[
527 RPCParam("response", "VDUInfoParams"),
528 ],
529 ),
530 ),
531
532 (r"/api/get_vdu_list", CalRequestHandler,
533 mk_attrs(
534 cal_method="get_vdu_list",
535 output_params=[
536 RPCParam("resources", "VNFResources"),
537 ],
538 ),
539 )
540 ])
541
542
543 class RwCalProxyTasklet(rift.tasklets.Tasklet):
544 HTTP_PORT = 9002
545 cal_interface = None
546
547 def __init__(self, *args, **kwargs):
548 super().__init__(*args, **kwargs)
549
550 self.app = None
551 self.server = None
552
553 def get_cal_interface(self):
554 if RwCalProxyTasklet.cal_interface is None:
555 plugin = rw_peas.PeasPlugin('rwcal_cloudsim', 'RwCal-1.0')
556 engine, info, extension = plugin()
557
558 RwCalProxyTasklet.cal_interface = plugin.get_interface("Cloud")
559 RwCalProxyTasklet.cal_interface.init(self.log_hdl)
560
561 return RwCalProxyTasklet.cal_interface
562
563 def start(self):
564 """Tasklet entry point"""
565 self.log.setLevel(logging.DEBUG)
566
567 super().start()
568
569 cal = self.get_cal_interface()
570 account = RwcalYang.CloudAccount(account_type="cloudsim")
571
572 self.app = CalProxyApp(self.log, self.loop, cal, account)
573 self._dts = rift.tasklets.DTS(
574 self.tasklet_info,
575 RwcalYang.get_schema(),
576 self.loop,
577 self.on_dts_state_change
578 )
579
580 io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
581 self.server = tornado.httpserver.HTTPServer(
582 self.app,
583 io_loop=io_loop,
584 )
585
586 self.log.info("Starting Cal Proxy Http Server on port %s",
587 RwCalProxyTasklet.HTTP_PORT)
588 self.server.listen(RwCalProxyTasklet.HTTP_PORT)
589
590 def stop(self):
591 try:
592 self.server.stop()
593 self._dts.deinit()
594 except Exception:
595 print("Caught Exception in LP stop:", sys.exc_info()[0])
596 raise
597
598 @asyncio.coroutine
599 def init(self):
600 pass
601
602 @asyncio.coroutine
603 def run(self):
604 pass
605
606 @asyncio.coroutine
607 def on_dts_state_change(self, state):
608 """Take action according to current dts state to transition
609 application into the corresponding application state
610
611 Arguments
612 state - current dts state
613 """
614
615 switch = {
616 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
617 rwdts.State.CONFIG: rwdts.State.RUN,
618 }
619
620 handlers = {
621 rwdts.State.INIT: self.init,
622 rwdts.State.RUN: self.run,
623 }
624
625 # Transition application to next state
626 handler = handlers.get(state, None)
627 if handler is not None:
628 yield from handler()
629
630 # Transition dts to next state
631 next_state = switch.get(state, None)
632 if next_state is not None:
633 self._dts.handle.set_state(next_state)