04a7cae311b4dfbf71ed7184437979d6f28d712f
[osm/SO.git] / rwlaunchpad / plugins / rwstagingmgr / rift / tasklets / rwstagingmgr / rwstagingmgr.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file rwstagingmgr.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 28-Sep-2016
21
22 """
23
24 import asyncio
25
26 import tornado
27 import tornado.httpserver
28 import tornado.httputil
29 import tornado.platform.asyncio
30 import tornadostreamform.multipart_streamer as multipart_streamer
31
32 import gi
33 gi.require_version('RwDts', '1.0')
34 gi.require_version('RwStagingMgmtYang', '1.0')
35 from gi.repository import (
36 RwDts as rwdts,
37 RwStagingMgmtYang)
38 import rift.tasklets
39
40 from . import rpc
41 from . import store
42 from .server import StagingApplication
43 from .publisher import StagingStorePublisher
44
45
46 class StagingManagerTasklet(rift.tasklets.Tasklet):
47 """Tasklet to handle all staging related operations
48 """
49 def __init__(self, *args, **kwargs):
50 try:
51 super().__init__(*args, **kwargs)
52 except Exception as e:
53 self.log.exception(e)
54
55 def start(self):
56 super().start()
57
58 self.log.debug("Registering with dts")
59
60 self.dts = rift.tasklets.DTS(
61 self.tasklet_info,
62 RwStagingMgmtYang.get_schema(),
63 self.loop,
64 self.on_dts_state_change
65 )
66
67 def stop(self):
68 try:
69 self.dts.deinit()
70 except Exception as e:
71 self.log.exception(e)
72
73 @asyncio.coroutine
74 def init(self):
75 self.store = store.StagingFileStore(log=self.log)
76 self.publisher = StagingStorePublisher(self.log, self.dts, self.loop)
77 # Fore recovery
78 self.publisher.delegate = self.store
79 # For create and delete events
80 self.store.delegate = self.publisher
81 yield from self.publisher.register()
82
83
84 io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
85 self.app = StagingApplication(self.store)
86
87 manifest = self.tasklet_info.get_pb_manifest()
88 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
89 ssl_key = manifest.bootstrap_phase.rwsecurity.key
90 ssl_options = {"certfile": ssl_cert, "keyfile": ssl_key}
91
92 if manifest.bootstrap_phase.rwsecurity.use_ssl:
93 self.server = tornado.httpserver.HTTPServer(
94 self.app,
95 max_body_size=self.app.MAX_BODY_SIZE,
96 io_loop=io_loop,
97 ssl_options=ssl_options)
98 else:
99 self.server = tornado.httpserver.HTTPServer(
100 self.app,
101 max_body_size=self.app.MAX_BODY_SIZE,
102 io_loop=io_loop,
103 )
104
105 self.create_stg_rpc = rpc.StagingAreaCreateRpcHandler(
106 self.log,
107 self.dts,
108 self.loop,
109 self.store)
110
111 yield from self.create_stg_rpc.register()
112
113 @asyncio.coroutine
114 def run(self):
115 self.server.listen(self.app.PORT)
116
117 @asyncio.coroutine
118 def on_dts_state_change(self, state):
119 """Handle DTS state change
120
121 Take action according to current DTS state to transition application
122 into the corresponding application state
123
124 Arguments
125 state - current dts state
126
127 """
128 switch = {
129 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
130 rwdts.State.CONFIG: rwdts.State.RUN,
131 }
132
133 handlers = {
134 rwdts.State.INIT: self.init,
135 rwdts.State.RUN: self.run,
136 }
137
138 # Transition application to next state
139 handler = handlers.get(state, None)
140 if handler is not None:
141 yield from handler()
142
143 # Transition dts to next state
144 next_state = switch.get(state, None)
145 if next_state is not None:
146 self.dts.handle.set_state(next_state)