3 # Copyright 2016-2017 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 @author Varun Prasad (varun.prasad@riftio.com)
28 from gi
.repository
import (RwDts
as rwdts
, ProtobufC
)
30 from rift
.mano
.utils
.project
import (
31 get_add_delete_update_cfgs
,
34 from ..core
import DtsHandler
37 class SubscriberDtsHandler(DtsHandler
):
38 """A common class for all subscribers.
41 def from_project(cls
, proj
, callback
=None):
42 """Convenience method to build the object from tasklet
45 proj (rift.mano.utils.project.ManoProject): Project
46 callback (None, optional): Callable, which will be invoked on
49 Signature of callback:
51 msg: The Gi Object msg from DTS
52 action(rwdts.QueryAction): Action type
54 return cls(proj
.log
, proj
.dts
, proj
.loop
, proj
, callback
=callback
)
56 def __init__(self
, log
, dts
, loop
, project
, callback
=None):
57 super().__init
__(log
, dts
, loop
, project
)
58 self
.callback
= callback
68 def get_reg_flags(self
):
69 """Default set of REG flags, can be over-ridden by sub classes.
72 Set of rwdts.Flag types.
74 return rwdts
.Flag
.SUBSCRIBER|rwdts
.Flag
.DELTA_READY|rwdts
.Flag
.CACHE
78 itr
= yield from self
.dts
.query_read(
83 result
= yield from res
84 result
= result
.result
91 class AbstractOpdataSubscriber(SubscriberDtsHandler
):
92 """Abstract class that simplifies the process of creating subscribers
95 Opdata subscriber can be created in one step by subclassing and implementing
96 the MANDATORY get_xpath() method
102 """Triggers the registration
106 self
._log
.warning("RPC already registered for project {}".
107 format(self
._project
.name
))
112 def on_commit(xact_info
):
114 xact_id
= xact_info
.handle
.get_xact().id
116 msg
, action
= xacts
.pop(xact_id
)
119 self
.callback(msg
, action
)
120 except Exception as e
:
121 self
.log
.error("Exception when committing data for registration:{} exception:{}".format(self
.get_xpath(), e
))
122 self
.log
.exception(e
)
124 return rwdts
.MemberRspCode
.ACTION_OK
127 def on_prepare(xact_info
, action
, ks_path
, msg
):
129 # Defer all actions till the commit state.
130 xacts
[xact_info
.xact
.id] = (msg
, action
)
132 except Exception as e
:
133 self
.log
.exception(e
)
136 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
137 except rift
.tasklets
.dts
.ResponseError
as e
:
138 self
._log
.warning("Reg handle is None during action {} for {}: {}".
139 format(action
, self
.__class
__, e
))
141 reg_event
= asyncio
.Event(loop
=self
.loop
)
144 def on_ready(_
, status
):
147 handler
= rift
.tasklets
.DTS
.RegistrationHandler(
149 on_prepare
=on_prepare
,
153 self
._reg
= yield from self
.dts
.register(
154 xpath
=self
.project
.add_project(self
.get_xpath()),
155 flags
=self
.get_reg_flags(),
158 # yield from reg_event.wait()
160 assert self
._reg
is not None
163 class AbstractConfigSubscriber(SubscriberDtsHandler
):
164 """Abstract class that simplifies the process of creating subscribers
167 Config subscriber can be created in one step by subclassing and implementing
168 the MANDATORY get_xpath() method
183 """ Register for VNFD configuration"""
185 def on_apply(dts
, acg
, xact
, action
, scratch
):
186 """Apply the configuration"""
187 if xact
.xact
is None:
188 if action
== rwdts
.AppconfAction
.INSTALL
:
191 for cfg
in self
._reg
.elements
:
193 self
.callback(cfg
, rwdts
.QueryAction
.CREATE
)
196 self
._log
.error("Reg handle is None during action {} for {}".
197 format(action
, self
.__class
__))
199 except Exception as e
:
200 self
._log
.exception("Adding config {} during restart failed: {}".
204 add_cfgs
, delete_cfgs
, update_cfgs
= get_add_delete_update_cfgs(
205 dts_member_reg
=self
._reg
,
207 key_name
=self
.key_name())
209 [self
.callback(cfg
, rwdts
.QueryAction
.DELETE
)
210 for cfg
in delete_cfgs
if self
.callback
]
212 [self
.callback(cfg
, rwdts
.QueryAction
.CREATE
)
213 for cfg
in add_cfgs
if self
.callback
]
215 [self
.callback(cfg
, rwdts
.QueryAction
.UPDATE
)
216 for cfg
in update_cfgs
if self
.callback
]
219 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
220 """ on prepare callback """
221 self
._log
.debug("Subscriber DTS prepare for project %s: %s",
222 self
.project
, xact_info
.query_action
)
224 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
225 except rift
.tasklets
.dts
.ResponseError
as e
:
227 "Subscriber DTS prepare for project {}, action {} in class {} failed: {}".
228 format(self
.project
, xact_info
.query_action
, self
.__class
__, e
))
230 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
231 with self
.dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
232 self
._reg
= acg
.register(
233 xpath
=self
.project
.add_project(self
.get_xpath()),
234 flags
=self
.get_reg_flags(),
235 on_prepare
=on_prepare
)