Merge from OSM SO master
[osm/SO.git] / common / python / rift / mano / dts / subscriber / core.py
1 """
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file core.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 09-Jul-2016
21
22 """
23
24 import abc
25 import collections
26 import asyncio
27
28 from gi.repository import (RwDts as rwdts, ProtobufC)
29 import rift.tasklets
30 from rift.mano.utils.project import (
31 get_add_delete_update_cfgs,
32 )
33
34 from ..core import DtsHandler
35
36
37 class SubscriberDtsHandler(DtsHandler):
38 """A common class for all subscribers.
39 """
40 @classmethod
41 def from_project(cls, proj, callback=None):
42 """Convenience method to build the object from tasklet
43
44 Args:
45 proj (rift.mano.utils.project.ManoProject): Project
46 callback (None, optional): Callable, which will be invoked on
47 subscriber changes.
48
49 Signature of callback:
50 Args:
51 msg: The Gi Object msg from DTS
52 action(rwdts.QueryAction): Action type
53 """
54 return cls(proj.log, proj.dts, proj.loop, proj, callback=callback)
55
56 def __init__(self, log, dts, loop, project, callback=None):
57 super().__init__(log, dts, loop, project)
58 self.callback = callback
59
60 def get_reg_flags(self):
61 """Default set of REG flags, can be over-ridden by sub classes.
62
63 Returns:
64 Set of rwdts.Flag types.
65 """
66 return rwdts.Flag.SUBSCRIBER|rwdts.Flag.DELTA_READY|rwdts.Flag.CACHE
67
68
69
70 class AbstractOpdataSubscriber(SubscriberDtsHandler):
71 """Abstract class that simplifies the process of creating subscribers
72 for opdata.
73
74 Opdata subscriber can be created in one step by subclassing and implementing
75 the MANDATORY get_xpath() method
76
77 """
78 @abc.abstractmethod
79 def get_xpath(self):
80 """
81 Returns:
82 str: xpath
83 """
84 pass
85
86 @asyncio.coroutine
87 def register(self):
88 """Triggers the registration
89 """
90
91 if self.reg:
92 self._log.warning("RPC already registered for project {}".
93 format(self._project.name))
94 return
95
96 xacts = {}
97
98 def on_commit(xact_info):
99 xact_id = xact_info.handle.get_xact().id
100 if xact_id in xacts:
101 msg, action = xacts.pop(xact_id)
102
103 if self.callback:
104 self.callback(msg, action)
105
106 return rwdts.MemberRspCode.ACTION_OK
107
108 @asyncio.coroutine
109 def on_prepare(xact_info, action, ks_path, msg):
110 try:
111 # Defer all actions till the commit state.
112 xacts[xact_info.xact.id] = (msg, action)
113
114 except Exception as e:
115 self.log.exception(e)
116
117 finally:
118 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
119
120 reg_event = asyncio.Event(loop=self.loop)
121
122 @asyncio.coroutine
123 def on_ready(_, status):
124 reg_event.set()
125
126 handler = rift.tasklets.DTS.RegistrationHandler(
127 on_ready=on_ready,
128 on_prepare=on_prepare,
129 on_commit=on_commit
130 )
131
132 self.reg = yield from self.dts.register(
133 xpath=self.project.add_project(self.get_xpath()),
134 flags=self.get_reg_flags(),
135 handler=handler)
136
137 # yield from reg_event.wait()
138
139 assert self.reg is not None
140
141
142 class AbstractConfigSubscriber(SubscriberDtsHandler):
143 """Abstract class that simplifies the process of creating subscribers
144 for config data.
145
146 Config subscriber can be created in one step by subclassing and implementing
147 the MANDATORY get_xpath() method
148
149 """
150 KEY = "msgs"
151
152 @abc.abstractmethod
153 def get_xpath(self):
154 pass
155
156 @abc.abstractmethod
157 def key_name(self):
158 pass
159
160 @asyncio.coroutine
161 def register(self):
162 """ Register for VNFD configuration"""
163
164 def on_apply(dts, acg, xact, action, scratch):
165 """Apply the configuration"""
166 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
167
168
169 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
170 dts_member_reg=self.reg,
171 xact=xact,
172 key_name=self.key_name())
173
174 [self.callback(cfg, rwdts.QueryAction.DELETE)
175 for cfg in delete_cfgs if self.callback]
176
177 [self.callback(cfg, rwdts.QueryAction.CREATE)
178 for cfg in add_cfgs if self.callback]
179
180 [self.callback(cfg, rwdts.QueryAction.UPDATE)
181 for cfg in update_cfgs if self.callback]
182
183 @asyncio.coroutine
184 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
185 """ on prepare callback """
186 self._log.debug("Subscriber DTS prepare for project %s: %s",
187 self.project, xact_info.query_action)
188 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
189
190 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
191 with self.dts.appconf_group_create(handler=acg_hdl) as acg:
192 self.reg = acg.register(
193 xpath=self.project.add_project(self.get_xpath()),
194 flags=self.get_reg_flags(),
195 on_prepare=on_prepare)