3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 @author Varun Prasad (varun.prasad@riftio.com)
27 import tornado
.httpserver
28 import tornado
.httputil
29 import tornado
.platform
.asyncio
30 import tornadostreamform
.multipart_streamer
as multipart_streamer
33 gi
.require_version('RwDts', '1.0')
34 gi
.require_version('RwStagingMgmtYang', '1.0')
35 from gi
.repository
import (
42 from .server
import StagingApplication
43 from .publisher
import StagingStorePublisher
46 class StagingManagerTasklet(rift
.tasklets
.Tasklet
):
47 """Tasklet to handle all staging related operations
49 def __init__(self
, *args
, **kwargs
):
51 super().__init
__(*args
, **kwargs
)
52 except Exception as e
:
58 self
.log
.debug("Registering with dts")
60 self
.dts
= rift
.tasklets
.DTS(
62 RwStagingMgmtYang
.get_schema(),
64 self
.on_dts_state_change
70 except Exception as e
:
75 self
.store
= store
.StagingFileStore(log
=self
.log
)
76 self
.publisher
= StagingStorePublisher(self
.log
, self
.dts
, self
.loop
)
78 self
.publisher
.delegate
= self
.store
79 # For create and delete events
80 self
.store
.delegate
= self
.publisher
81 yield from self
.publisher
.register()
84 io_loop
= rift
.tasklets
.tornado
.TaskletAsyncIOLoop(asyncio_loop
=self
.loop
)
85 self
.app
= StagingApplication(self
.store
)
87 manifest
= self
.tasklet_info
.get_pb_manifest()
88 ssl_cert
= manifest
.bootstrap_phase
.rwsecurity
.cert
89 ssl_key
= manifest
.bootstrap_phase
.rwsecurity
.key
90 ssl_options
= {"certfile": ssl_cert
, "keyfile": ssl_key
}
92 if manifest
.bootstrap_phase
.rwsecurity
.use_ssl
:
93 self
.server
= tornado
.httpserver
.HTTPServer(
95 max_body_size
=self
.app
.MAX_BODY_SIZE
,
97 ssl_options
=ssl_options
)
99 self
.server
= tornado
.httpserver
.HTTPServer(
101 max_body_size
=self
.app
.MAX_BODY_SIZE
,
105 self
.create_stg_rpc
= rpc
.StagingAreaCreateRpcHandler(
111 yield from self
.create_stg_rpc
.register()
115 self
.server
.listen(self
.app
.PORT
)
118 def on_dts_state_change(self
, state
):
119 """Handle DTS state change
121 Take action according to current DTS state to transition application
122 into the corresponding application state
125 state - current dts state
129 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
130 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
134 rwdts
.State
.INIT
: self
.init
,
135 rwdts
.State
.RUN
: self
.run
,
138 # Transition application to next state
139 handler
= handlers
.get(state
, None)
140 if handler
is not None:
143 # Transition dts to next state
144 next_state
= switch
.get(state
, None)
145 if next_state
is not None:
146 self
.dts
.handle
.set_state(next_state
)