X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwstagingmgr%2Frift%2Ftasklets%2Frwstagingmgr%2Frwstagingmgr.py;fp=rwlaunchpad%2Fplugins%2Frwstagingmgr%2Frift%2Ftasklets%2Frwstagingmgr%2Frwstagingmgr.py;h=04a7cae311b4dfbf71ed7184437979d6f28d712f;hb=07b439824b5eac4dc760ce56b52fbdcf5539db4c;hp=0000000000000000000000000000000000000000;hpb=39b275b4ba816363b2dcdf403f70daffdeddabcc;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/rwstagingmgr.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/rwstagingmgr.py new file mode 100644 index 00000000..04a7cae3 --- /dev/null +++ b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/rwstagingmgr.py @@ -0,0 +1,146 @@ +""" +# +# Copyright 2016 RIFT.IO Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +@file rwstagingmgr.py +@author Varun Prasad (varun.prasad@riftio.com) +@date 28-Sep-2016 + +""" + +import asyncio + +import tornado +import tornado.httpserver +import tornado.httputil +import tornado.platform.asyncio +import tornadostreamform.multipart_streamer as multipart_streamer + +import gi +gi.require_version('RwDts', '1.0') +gi.require_version('RwStagingMgmtYang', '1.0') +from gi.repository import ( + RwDts as rwdts, + RwStagingMgmtYang) +import rift.tasklets + +from . import rpc +from . import store +from .server import StagingApplication +from .publisher import StagingStorePublisher + + +class StagingManagerTasklet(rift.tasklets.Tasklet): + """Tasklet to handle all staging related operations + """ + def __init__(self, *args, **kwargs): + try: + super().__init__(*args, **kwargs) + except Exception as e: + self.log.exception(e) + + def start(self): + super().start() + + self.log.debug("Registering with dts") + + self.dts = rift.tasklets.DTS( + self.tasklet_info, + RwStagingMgmtYang.get_schema(), + self.loop, + self.on_dts_state_change + ) + + def stop(self): + try: + self.dts.deinit() + except Exception as e: + self.log.exception(e) + + @asyncio.coroutine + def init(self): + self.store = store.StagingFileStore(log=self.log) + self.publisher = StagingStorePublisher(self.log, self.dts, self.loop) + # Fore recovery + self.publisher.delegate = self.store + # For create and delete events + self.store.delegate = self.publisher + yield from self.publisher.register() + + + io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop) + self.app = StagingApplication(self.store) + + manifest = self.tasklet_info.get_pb_manifest() + ssl_cert = manifest.bootstrap_phase.rwsecurity.cert + ssl_key = manifest.bootstrap_phase.rwsecurity.key + ssl_options = {"certfile": ssl_cert, "keyfile": ssl_key} + + if manifest.bootstrap_phase.rwsecurity.use_ssl: + self.server = tornado.httpserver.HTTPServer( + self.app, + max_body_size=self.app.MAX_BODY_SIZE, + io_loop=io_loop, + ssl_options=ssl_options) + else: + self.server = tornado.httpserver.HTTPServer( + self.app, + max_body_size=self.app.MAX_BODY_SIZE, + io_loop=io_loop, + ) + + self.create_stg_rpc = rpc.StagingAreaCreateRpcHandler( + self.log, + self.dts, + self.loop, + self.store) + + yield from self.create_stg_rpc.register() + + @asyncio.coroutine + def run(self): + self.server.listen(self.app.PORT) + + @asyncio.coroutine + def on_dts_state_change(self, state): + """Handle DTS state change + + Take action according to current DTS state to transition application + into the corresponding application state + + Arguments + state - current dts state + + """ + switch = { + rwdts.State.INIT: rwdts.State.REGN_COMPLETE, + rwdts.State.CONFIG: rwdts.State.RUN, + } + + handlers = { + rwdts.State.INIT: self.init, + rwdts.State.RUN: self.run, + } + + # Transition application to next state + handler = handlers.get(state, None) + if handler is not None: + yield from handler() + + # Transition dts to next state + next_state = switch.get(state, None) + if next_state is not None: + self.dts.handle.set_state(next_state)