Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwstagingmgr / rift / tasklets / rwstagingmgr / rwstagingmgr.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file rwstagingmgr.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 28-Sep-2016
21
22 """
23
24 import asyncio
25
26 import tornado
27 import tornado.httpserver
28 import tornado.httputil
29 import tornado.platform.asyncio
30 import tornadostreamform.multipart_streamer as multipart_streamer
31
32 import gi
33 gi.require_version('RwDts', '1.0')
34 gi.require_version('RwStagingMgmtYang', '1.0')
35 from gi.repository import (
36 RwDts as rwdts,
37 RwStagingMgmtYang)
38 import rift.tasklets
39 from rift.mano.utils.project import (
40 ManoProject,
41 ProjectHandler,
42 )
43
44 from . import rpc
45 from . import store
46 from .server import StagingApplication
47 from .publisher import StagingStorePublisher
48
49
50 class StagingManagerProject(ManoProject):
51
52 def __init__(self, name, tasklet, **kw):
53 super(StagingManagerProject, self).__init__(tasklet.log, name)
54 self.update(tasklet)
55
56 self.publisher = StagingStorePublisher(self)
57 # For recovery
58 self.publisher.delegate = tasklet.store
59
60 @asyncio.coroutine
61 def register (self):
62 yield from self.publisher.register()
63
64 def deregister(self):
65 self.publisher.deregister()
66
67
68 class StagingManagerTasklet(rift.tasklets.Tasklet):
69 """Tasklet to handle all staging related operations
70 """
71 def __init__(self, *args, **kwargs):
72 try:
73 super().__init__(*args, **kwargs)
74 self._project_handler = None
75 self.projects = {}
76
77 except Exception as e:
78 self.log.exception("Staging Manager tasklet init: {}".
79 format(e))
80
81 def start(self):
82 super().start()
83
84 self.log.debug("Registering with dts")
85
86 self.dts = rift.tasklets.DTS(
87 self.tasklet_info,
88 RwStagingMgmtYang.get_schema(),
89 self.loop,
90 self.on_dts_state_change
91 )
92
93 def stop(self):
94 try:
95 self.dts.deinit()
96 except Exception as e:
97 self.log.exception(e)
98
99 @asyncio.coroutine
100 def init(self):
101 self.store = store.StagingFileStore(self)
102
103 io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
104 self.app = StagingApplication(self.store)
105
106 manifest = self.tasklet_info.get_pb_manifest()
107 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
108 ssl_key = manifest.bootstrap_phase.rwsecurity.key
109 ssl_options = {"certfile": ssl_cert, "keyfile": ssl_key}
110
111 if manifest.bootstrap_phase.rwsecurity.use_ssl:
112 self.server = tornado.httpserver.HTTPServer(
113 self.app,
114 max_body_size=self.app.MAX_BODY_SIZE,
115 io_loop=io_loop,
116 ssl_options=ssl_options)
117 else:
118 self.server = tornado.httpserver.HTTPServer(
119 self.app,
120 max_body_size=self.app.MAX_BODY_SIZE,
121 io_loop=io_loop,
122 )
123
124 self.create_stg_rpc = rpc.StagingAreaCreateRpcHandler(
125 self.log,
126 self.dts,
127 self.loop,
128 self.store)
129 yield from self.create_stg_rpc.register()
130
131 self.log.debug("creating project handler")
132 self.project_handler = ProjectHandler(self, StagingManagerProject)
133 self.project_handler.register()
134
135 @asyncio.coroutine
136 def run(self):
137 self.server.listen(self.app.PORT)
138
139 @asyncio.coroutine
140 def on_dts_state_change(self, state):
141 """Handle DTS state change
142
143 Take action according to current DTS state to transition application
144 into the corresponding application state
145
146 Arguments
147 state - current dts state
148
149 """
150 switch = {
151 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
152 rwdts.State.CONFIG: rwdts.State.RUN,
153 }
154
155 handlers = {
156 rwdts.State.INIT: self.init,
157 rwdts.State.RUN: self.run,
158 }
159
160 # Transition application to next state
161 handler = handlers.get(state, None)
162 if handler is not None:
163 yield from handler()
164
165 # Transition dts to next state
166 next_state = switch.get(state, None)
167 if next_state is not None:
168 self.dts.handle.set_state(next_state)