a9de01b56da0350bb2893460df943146d0fe389d
[osm/SO.git] / common / python / rift / mano / sdn / config.py
1
2 #
3 # Copyright 2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19
20 import gi
21 gi.require_version('RwDts', '1.0')
22 import rift.tasklets
23
24 from gi.repository import (
25 RwDts as rwdts,
26 ProtobufC,
27 )
28
29 from . import accounts
30
31
32 class SDNAccountNotFound(Exception):
33 pass
34
35
36 class SDNAccountError(Exception):
37 pass
38
39
40 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
41 # Unforunately, it is currently difficult to figure out what has exactly
42 # changed in this xact without Pbdelta support (RIFT-4916)
43 # As a workaround, we can fetch the pre and post xact elements and
44 # perform a comparison to figure out adds/deletes/updates
45 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
46 curr_cfgs = list(dts_member_reg.elements)
47
48 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
49 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
50
51 # Find Adds
52 added_keys = set(xact_key_map) - set(curr_key_map)
53 added_cfgs = [xact_key_map[key] for key in added_keys]
54
55 # Find Deletes
56 deleted_keys = set(curr_key_map) - set(xact_key_map)
57 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
58
59 # Find Updates
60 updated_keys = set(curr_key_map) & set(xact_key_map)
61 updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
62
63 return added_cfgs, deleted_cfgs, updated_cfgs
64
65
66 class SDNAccountConfigCallbacks(object):
67 def __init__(self,
68 on_add_apply=None, on_add_prepare=None,
69 on_delete_apply=None, on_delete_prepare=None):
70
71 @asyncio.coroutine
72 def prepare_noop(*args, **kwargs):
73 pass
74
75 def apply_noop(*args, **kwargs):
76 pass
77
78 self.on_add_apply = on_add_apply
79 self.on_add_prepare = on_add_prepare
80 self.on_delete_apply = on_delete_apply
81 self.on_delete_prepare = on_delete_prepare
82
83 for f in ('on_add_apply', 'on_delete_apply'):
84 ref = getattr(self, f)
85 if ref is None:
86 setattr(self, f, apply_noop)
87 continue
88
89 if asyncio.iscoroutinefunction(ref):
90 raise ValueError('%s cannot be a coroutine' % (f,))
91
92 for f in ('on_add_prepare', 'on_delete_prepare'):
93 ref = getattr(self, f)
94 if ref is None:
95 setattr(self, f, prepare_noop)
96 continue
97
98 if not asyncio.iscoroutinefunction(ref):
99 raise ValueError("%s must be a coroutine" % f)
100
101
102 class SDNAccountConfigSubscriber(object):
103 XPATH = "C,/rw-sdn:sdn/rw-sdn:account"
104
105 def __init__(self, dts, log, rwlog_hdl, sdn_callbacks, acctstore):
106 self._dts = dts
107 self._log = log
108 self._rwlog_hdl = rwlog_hdl
109 self._reg = None
110
111 self.accounts = acctstore
112
113 self._sdn_callbacks = sdn_callbacks
114
115 def add_account(self, account_msg):
116 self._log.info("adding sdn account: {}".format(account_msg))
117
118 account = accounts.SDNAccount(self._log, self._rwlog_hdl, account_msg)
119 self.accounts[account.name] = account
120
121 self._sdn_callbacks.on_add_apply(account)
122
123 def delete_account(self, account_name):
124 self._log.info("deleting sdn account: {}".format(account_name))
125 del self.accounts[account_name]
126
127 self._sdn_callbacks.on_delete_apply(account_name)
128
129 def update_account(self, account_msg):
130 """ Update an existing sdn account
131
132 In order to simplify update, turn an update into a delete followed by
133 an add. The drawback to this approach is that we will not support
134 updates of an "in-use" sdn account, but this seems like a
135 reasonable trade-off.
136
137
138 Arguments:
139 account_msg - The sdn account config message
140 """
141 self._log.info("updating sdn account: {}".format(account_msg))
142
143 self.delete_account(account_msg.name)
144 self.add_account(account_msg)
145
146 def register(self):
147 @asyncio.coroutine
148 def apply_config(dts, acg, xact, action, _):
149 self._log.debug("Got sdn account apply config (xact: %s) (action: %s)", xact, action)
150
151 if xact.xact is None:
152 if action == rwdts.AppconfAction.INSTALL:
153 curr_cfg = self._reg.elements
154 for cfg in curr_cfg:
155 self._log.debug("SDN account being re-added after restart.")
156 if not cfg.has_field('account_type'):
157 raise SDNAccountError("New SDN account must contain account_type field.")
158 self.add_account(cfg)
159 else:
160 # When RIFT first comes up, an INSTALL is called with the current config
161 # Since confd doesn't actally persist data this never has any data so
162 # skip this for now.
163 self._log.debug("No xact handle. Skipping apply config")
164
165 return
166
167 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
168 dts_member_reg=self._reg,
169 xact=xact,
170 key_name="name",
171 )
172
173 # Handle Deletes
174 for cfg in delete_cfgs:
175 self.delete_account(cfg.name)
176
177 # Handle Adds
178 for cfg in add_cfgs:
179 self.add_account(cfg)
180
181 # Handle Updates
182 for cfg in update_cfgs:
183 self.update_account(cfg)
184
185 @asyncio.coroutine
186 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
187 """ Prepare callback from DTS for SDN Account """
188
189 action = xact_info.query_action
190 self._log.debug("SDN account on_prepare config received (action: %s): %s",
191 xact_info.query_action, msg)
192
193 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
194 if msg.name in self.accounts:
195 self._log.debug("SDN account already exists. Invoking update request")
196
197 # Since updates are handled by a delete followed by an add, invoke the
198 # delete prepare callbacks to give clients an opportunity to reject.
199 yield from self._sdn_callbacks.on_delete_prepare(msg.name)
200
201 else:
202 self._log.debug("SDN account does not already exist. Invoking on_prepare add request")
203 if not msg.has_field('account_type'):
204 raise SDNAccountError("New sdn account must contain account_type field.")
205
206 account = accounts.SDNAccount(self._log, self._rwlog_hdl, msg)
207 yield from self._sdn_callbacks.on_add_prepare(account)
208
209 elif action == rwdts.QueryAction.DELETE:
210 # Check if the entire SDN account got deleted
211 fref = ProtobufC.FieldReference.alloc()
212 fref.goto_whole_message(msg.to_pbcm())
213 if fref.is_field_deleted():
214 yield from self._sdn_callbacks.on_delete_prepare(msg.name)
215
216 else:
217 self._log.error("Deleting individual fields for SDN account not supported")
218 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
219 return
220
221 else:
222 self._log.error("Action (%s) NOT SUPPORTED", action)
223 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
224
225 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
226
227 self._log.debug("Registering for SDN Account config using xpath: %s",
228 SDNAccountConfigSubscriber.XPATH,
229 )
230
231 acg_handler = rift.tasklets.AppConfGroup.Handler(
232 on_apply=apply_config,
233 )
234
235 with self._dts.appconf_group_create(acg_handler) as acg:
236 self._reg = acg.register(
237 xpath=SDNAccountConfigSubscriber.XPATH,
238 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
239 on_prepare=on_prepare,
240 )