3 # Copyright 2016-2017 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 @author Varun Prasad (varun.prasad@riftio.com)
28 from gi
.repository
import (RwDts
as rwdts
, ProtobufC
)
30 from rift
.mano
.utils
.project
import (
31 get_add_delete_update_cfgs
,
34 from ..core
import DtsHandler
37 class SubscriberDtsHandler(DtsHandler
):
38 """A common class for all subscribers.
41 def from_project(cls
, proj
, callback
=None):
42 """Convenience method to build the object from tasklet
45 proj (rift.mano.utils.project.ManoProject): Project
46 callback (None, optional): Callable, which will be invoked on
49 Signature of callback:
51 msg: The Gi Object msg from DTS
52 action(rwdts.QueryAction): Action type
54 return cls(proj
.log
, proj
.dts
, proj
.loop
, proj
, callback
=callback
)
56 def __init__(self
, log
, dts
, loop
, project
, callback
=None):
57 super().__init
__(log
, dts
, loop
, project
)
58 self
.callback
= callback
60 def get_reg_flags(self
):
61 """Default set of REG flags, can be over-ridden by sub classes.
64 Set of rwdts.Flag types.
66 return rwdts
.Flag
.SUBSCRIBER|rwdts
.Flag
.DELTA_READY|rwdts
.Flag
.CACHE
70 class AbstractOpdataSubscriber(SubscriberDtsHandler
):
71 """Abstract class that simplifies the process of creating subscribers
74 Opdata subscriber can be created in one step by subclassing and implementing
75 the MANDATORY get_xpath() method
88 """Triggers the registration
92 self
._log
.warning("RPC already registered for project {}".
93 format(self
._project
.name
))
98 def on_commit(xact_info
):
99 xact_id
= xact_info
.handle
.get_xact().id
101 msg
, action
= xacts
.pop(xact_id
)
104 self
.callback(msg
, action
)
106 return rwdts
.MemberRspCode
.ACTION_OK
109 def on_prepare(xact_info
, action
, ks_path
, msg
):
111 # Defer all actions till the commit state.
112 xacts
[xact_info
.xact
.id] = (msg
, action
)
114 except Exception as e
:
115 self
.log
.exception(e
)
118 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
120 reg_event
= asyncio
.Event(loop
=self
.loop
)
123 def on_ready(_
, status
):
126 handler
= rift
.tasklets
.DTS
.RegistrationHandler(
128 on_prepare
=on_prepare
,
132 self
.reg
= yield from self
.dts
.register(
133 xpath
=self
.project
.add_project(self
.get_xpath()),
134 flags
=self
.get_reg_flags(),
137 # yield from reg_event.wait()
139 assert self
.reg
is not None
142 class AbstractConfigSubscriber(SubscriberDtsHandler
):
143 """Abstract class that simplifies the process of creating subscribers
146 Config subscriber can be created in one step by subclassing and implementing
147 the MANDATORY get_xpath() method
162 """ Register for VNFD configuration"""
165 self
._log
.warning("RPC already registered for project {}".
166 format(self
._project
.name
))
169 def on_apply(dts
, acg
, xact
, action
, scratch
):
170 """Apply the configuration"""
171 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
174 add_cfgs
, delete_cfgs
, update_cfgs
= get_add_delete_update_cfgs(
175 dts_member_reg
=self
.reg
,
177 key_name
=self
.key_name())
179 [self
.callback(cfg
, rwdts
.QueryAction
.DELETE
)
180 for cfg
in delete_cfgs
if self
.callback
]
182 [self
.callback(cfg
, rwdts
.QueryAction
.CREATE
)
183 for cfg
in add_cfgs
if self
.callback
]
185 [self
.callback(cfg
, rwdts
.QueryAction
.UPDATE
)
186 for cfg
in update_cfgs
if self
.callback
]
189 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
190 """ on prepare callback """
191 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
193 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
194 with self
.dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
195 self
.reg
= acg
.register(
196 xpath
=self
.project
.add_project(self
.get_xpath()),
197 flags
=self
.get_reg_flags(),
198 on_prepare
=on_prepare
)