3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 @author Varun Prasad (varun.prasad@riftio.com)
28 from gi
.repository
import (RwDts
as rwdts
, ProtobufC
)
31 from ..core
import DtsHandler
34 class SubscriberDtsHandler(DtsHandler
):
35 """A common class for all subscribers.
38 def from_tasklet(cls
, tasklet
, callback
=None):
39 """Convenience method to build the object from tasklet
42 tasklet (rift.tasklets.Tasklet): Tasklet
43 callback (None, optional): Callable, which will be invoked on
46 Signature of callback:
48 msg: The Gi Object msg from DTS
49 action(rwdts.QueryAction): Action type
51 return cls(tasklet
.log
, tasklet
.dts
, tasklet
.loop
, callback
=callback
)
53 def __init__(self
, log
, dts
, loop
, callback
=None):
54 super().__init
__(log
, dts
, loop
)
55 self
.callback
= callback
57 def get_reg_flags(self
):
58 """Default set of REG flags, can be over-ridden by sub classes.
61 Set of rwdts.Flag types.
63 return rwdts
.Flag
.SUBSCRIBER|rwdts
.Flag
.DELTA_READY|rwdts
.Flag
.CACHE
67 class AbstractOpdataSubscriber(SubscriberDtsHandler
):
68 """Abstract class that simplifies the process of creating subscribers
71 Opdata subscriber can be created in one step by subclassing and implementing
72 the MANDATORY get_xpath() method
85 """Triggers the registration
89 def on_commit(xact_info
):
90 xact_id
= xact_info
.handle
.get_xact().id
92 msg
, action
= xacts
.pop(xact_id
)
95 self
.callback(msg
, action
)
97 return rwdts
.MemberRspCode
.ACTION_OK
100 def on_prepare(xact_info
, action
, ks_path
, msg
):
102 # Defer all actions till the commit state.
103 xacts
[xact_info
.xact
.id] = (msg
, action
)
105 except Exception as e
:
106 self
.log
.exception(e
)
109 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
111 reg_event
= asyncio
.Event(loop
=self
.loop
)
114 def on_ready(_
, status
):
117 handler
= rift
.tasklets
.DTS
.RegistrationHandler(
119 on_prepare
=on_prepare
,
123 self
.reg
= yield from self
.dts
.register(
124 xpath
=self
.get_xpath(),
125 flags
=self
.get_reg_flags(),
128 # yield from reg_event.wait()
130 assert self
.reg
is not None
132 def deregister(self
):
133 self
.reg
.deregister()
136 class AbstractConfigSubscriber(SubscriberDtsHandler
):
137 """Abstract class that simplifies the process of creating subscribers
140 Config subscriber can be created in one step by subclassing and implementing
141 the MANDATORY get_xpath() method
154 def get_add_delete_update_cfgs(self
, dts_member_reg
, xact
, key_name
):
155 # Unforunately, it is currently difficult to figure out what has exactly
156 # changed in this xact without Pbdelta support (RIFT-4916)
157 # As a workaround, we can fetch the pre and post xact elements and
158 # perform a comparison to figure out adds/deletes/updates
159 xact_cfgs
= list(dts_member_reg
.get_xact_elements(xact
))
160 curr_cfgs
= list(dts_member_reg
.elements
)
162 xact_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in xact_cfgs
}
163 curr_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in curr_cfgs
}
166 added_keys
= set(xact_key_map
) - set(curr_key_map
)
167 added_cfgs
= [xact_key_map
[key
] for key
in added_keys
]
170 deleted_keys
= set(curr_key_map
) - set(xact_key_map
)
171 deleted_cfgs
= [curr_key_map
[key
] for key
in deleted_keys
]
174 updated_keys
= set(curr_key_map
) & set(xact_key_map
)
175 updated_cfgs
= [xact_key_map
[key
] for key
in updated_keys
if xact_key_map
[key
] != curr_key_map
[key
]]
177 return added_cfgs
, deleted_cfgs
, updated_cfgs
181 """ Register for VNFD configuration"""
183 def on_apply(dts
, acg
, xact
, action
, scratch
):
184 """Apply the configuration"""
185 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
188 add_cfgs
, delete_cfgs
, update_cfgs
= self
.get_add_delete_update_cfgs(
189 dts_member_reg
=self
.reg
,
191 key_name
=self
.key_name())
193 [self
.callback(cfg
, rwdts
.QueryAction
.DELETE
)
194 for cfg
in delete_cfgs
if self
.callback
]
196 [self
.callback(cfg
, rwdts
.QueryAction
.CREATE
)
197 for cfg
in add_cfgs
if self
.callback
]
199 [self
.callback(cfg
, rwdts
.QueryAction
.UPDATE
)
200 for cfg
in update_cfgs
if self
.callback
]
203 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
204 """ on prepare callback """
205 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
207 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
208 with self
.dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
209 self
.reg
= acg
.register(
210 xpath
=self
.get_xpath(),
211 flags
=self
.get_reg_flags(),
212 on_prepare
=on_prepare
)
214 def deregister(self
):
215 self
.reg
.deregister()