3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 @author Varun Prasad (varun.prasad@riftio.com)
27 import tornado
.httpserver
28 import tornado
.httputil
29 import tornado
.platform
.asyncio
30 import tornadostreamform
.multipart_streamer
as multipart_streamer
33 gi
.require_version('RwDts', '1.0')
34 gi
.require_version('RwStagingMgmtYang', '1.0')
35 gi
.require_version('rwlib', '1.0')
37 from gi
.repository
import (
41 from rift
.mano
.utils
.project
import (
45 import gi
.repository
.rwlib
as rwlib
49 from .server
import StagingApplication
50 from .publisher
import StagingStorePublisher
53 class StagingManagerProject(ManoProject
):
55 def __init__(self
, name
, tasklet
, **kw
):
56 super(StagingManagerProject
, self
).__init
__(tasklet
.log
, name
)
59 self
.publisher
= StagingStorePublisher(self
)
61 self
.publisher
.delegate
= tasklet
.store
65 yield from self
.publisher
.register()
68 self
.publisher
.deregister()
71 class StagingManagerTasklet(rift
.tasklets
.Tasklet
):
72 """Tasklet to handle all staging related operations
74 def __init__(self
, *args
, **kwargs
):
76 super().__init
__(*args
, **kwargs
)
77 self
._project
_handler
= None
80 except Exception as e
:
81 self
.log
.exception("Staging Manager tasklet init: {}".
87 self
.log
.debug("Registering with dts")
89 self
.dts
= rift
.tasklets
.DTS(
91 RwStagingMgmtYang
.get_schema(),
93 self
.on_dts_state_change
99 except Exception as e
:
100 self
.log
.exception(e
)
104 self
.store
= store
.StagingFileStore(self
)
106 io_loop
= rift
.tasklets
.tornado
.TaskletAsyncIOLoop(asyncio_loop
=self
.loop
)
107 self
.app
= StagingApplication(self
.store
, self
.loop
)
109 manifest
= self
.tasklet_info
.get_pb_manifest()
110 ssl_cert
= manifest
.bootstrap_phase
.rwsecurity
.cert
111 ssl_key
= manifest
.bootstrap_phase
.rwsecurity
.key
112 ssl_options
= {"certfile": ssl_cert
, "keyfile": ssl_key
}
114 if manifest
.bootstrap_phase
.rwsecurity
.use_ssl
:
115 self
.server
= tornado
.httpserver
.HTTPServer(
117 max_body_size
=self
.app
.MAX_BODY_SIZE
,
119 ssl_options
=ssl_options
)
121 self
.server
= tornado
.httpserver
.HTTPServer(
123 max_body_size
=self
.app
.MAX_BODY_SIZE
,
127 self
.create_stg_rpc
= rpc
.StagingAreaCreateRpcHandler(
132 yield from self
.create_stg_rpc
.register()
134 self
.log
.debug("creating project handler")
135 self
.project_handler
= ProjectHandler(self
, StagingManagerProject
)
136 self
.project_handler
.register()
140 address
= rwlib
.getenv("RWVM_INTERNAL_IPADDR")
141 if (address
is None):
143 self
.server
.listen(self
.app
.PORT
, address
=address
)
144 self
.server
.listen(self
.app
.PORT
, address
="127.0.0.1")
147 def on_dts_state_change(self
, state
):
148 """Handle DTS state change
150 Take action according to current DTS state to transition application
151 into the corresponding application state
154 state - current dts state
158 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
159 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
163 rwdts
.State
.INIT
: self
.init
,
164 rwdts
.State
.RUN
: self
.run
,
167 # Transition application to next state
168 handler
= handlers
.get(state
, None)
169 if handler
is not None:
172 # Transition dts to next state
173 next_state
= switch
.get(state
, None)
174 if next_state
is not None:
175 self
.dts
.handle
.set_state(next_state
)