update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwstagingmgr / rift / tasklets / rwstagingmgr / rwstagingmgr.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file rwstagingmgr.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 28-Sep-2016
21
22 """
23
24 import asyncio
25
26 import tornado
27 import tornado.httpserver
28 import tornado.httputil
29 import tornado.platform.asyncio
30 import tornadostreamform.multipart_streamer as multipart_streamer
31
32 import gi
33 gi.require_version('RwDts', '1.0')
34 gi.require_version('RwStagingMgmtYang', '1.0')
35 gi.require_version('rwlib', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwStagingMgmtYang)
40 import rift.tasklets
41 from rift.mano.utils.project import (
42 ManoProject,
43 ProjectHandler,
44 )
45 import gi.repository.rwlib as rwlib
46
47 from . import rpc
48 from . import store
49 from .server import StagingApplication
50 from .publisher import StagingStorePublisher
51
52
53 class StagingManagerProject(ManoProject):
54
55 def __init__(self, name, tasklet, **kw):
56 super(StagingManagerProject, self).__init__(tasklet.log, name)
57 self.update(tasklet)
58
59 self.publisher = StagingStorePublisher(self)
60 # For recovery
61 self.publisher.delegate = tasklet.store
62
63 @asyncio.coroutine
64 def register (self):
65 yield from self.publisher.register()
66
67 def deregister(self):
68 self.publisher.deregister()
69
70
71 class StagingManagerTasklet(rift.tasklets.Tasklet):
72 """Tasklet to handle all staging related operations
73 """
74 def __init__(self, *args, **kwargs):
75 try:
76 super().__init__(*args, **kwargs)
77 self._project_handler = None
78 self.projects = {}
79
80 except Exception as e:
81 self.log.exception("Staging Manager tasklet init: {}".
82 format(e))
83
84 def start(self):
85 super().start()
86
87 self.log.debug("Registering with dts")
88
89 self.dts = rift.tasklets.DTS(
90 self.tasklet_info,
91 RwStagingMgmtYang.get_schema(),
92 self.loop,
93 self.on_dts_state_change
94 )
95
96 def stop(self):
97 try:
98 self.dts.deinit()
99 except Exception as e:
100 self.log.exception(e)
101
102 @asyncio.coroutine
103 def init(self):
104 self.store = store.StagingFileStore(self)
105
106 io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
107 self.app = StagingApplication(self.store, self.loop)
108
109 manifest = self.tasklet_info.get_pb_manifest()
110 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
111 ssl_key = manifest.bootstrap_phase.rwsecurity.key
112 ssl_options = {"certfile": ssl_cert, "keyfile": ssl_key}
113
114 if manifest.bootstrap_phase.rwsecurity.use_ssl:
115 self.server = tornado.httpserver.HTTPServer(
116 self.app,
117 max_body_size=self.app.MAX_BODY_SIZE,
118 io_loop=io_loop,
119 ssl_options=ssl_options)
120 else:
121 self.server = tornado.httpserver.HTTPServer(
122 self.app,
123 max_body_size=self.app.MAX_BODY_SIZE,
124 io_loop=io_loop,
125 )
126
127 self.create_stg_rpc = rpc.StagingAreaCreateRpcHandler(
128 self.log,
129 self.dts,
130 self.loop,
131 self.store)
132 yield from self.create_stg_rpc.register()
133
134 self.log.debug("creating project handler")
135 self.project_handler = ProjectHandler(self, StagingManagerProject)
136 self.project_handler.register()
137
138 @asyncio.coroutine
139 def run(self):
140 address = rwlib.getenv("RWVM_INTERNAL_IPADDR")
141 if (address is None):
142 address=""
143 self.server.listen(self.app.PORT, address=address)
144 self.server.listen(self.app.PORT, address="127.0.0.1")
145
146 @asyncio.coroutine
147 def on_dts_state_change(self, state):
148 """Handle DTS state change
149
150 Take action according to current DTS state to transition application
151 into the corresponding application state
152
153 Arguments
154 state - current dts state
155
156 """
157 switch = {
158 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
159 rwdts.State.CONFIG: rwdts.State.RUN,
160 }
161
162 handlers = {
163 rwdts.State.INIT: self.init,
164 rwdts.State.RUN: self.run,
165 }
166
167 # Transition application to next state
168 handler = handlers.get(state, None)
169 if handler is not None:
170 yield from handler()
171
172 # Transition dts to next state
173 next_state = switch.get(state, None)
174 if next_state is not None:
175 self.dts.handle.set_state(next_state)