7500bac72d9717e2e80e4d144f06382b9feae9f4
[osm/SO.git] / common / python / rift / mano / config_agent / config.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import rw_peas
20
21 import gi
22 gi.require_version('RwDts', '1.0')
23 import rift.tasklets
24
25 from gi.repository import (
26 RwcalYang as rwcal,
27 RwDts as rwdts,
28 RwConfigAgentYang as rwcfg_agent,
29 ProtobufC,
30 )
31
32 class ConfigAccountNotFound(Exception):
33 pass
34
35 class ConfigAccountError(Exception):
36 pass
37
38
39 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
40 # Unforunately, it is currently difficult to figure out what has exactly
41 # changed in this xact without Pbdelta support (RIFT-4916)
42 # As a workaround, we can fetch the pre and post xact elements and
43 # perform a comparison to figure out adds/deletes/updates
44 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
45 curr_cfgs = list(dts_member_reg.elements)
46
47 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
48 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
49
50 # Find Adds
51 added_keys = set(xact_key_map) - set(curr_key_map)
52 added_cfgs = [xact_key_map[key] for key in added_keys]
53
54 # Find Deletes
55 deleted_keys = set(curr_key_map) - set(xact_key_map)
56 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
57
58 # Find Updates
59 updated_keys = set(curr_key_map) & set(xact_key_map)
60 updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
61
62 return added_cfgs, deleted_cfgs, updated_cfgs
63
64
65 class ConfigAgentCallbacks(object):
66 def __init__(self,
67 on_add_apply=None, on_add_prepare=None,
68 on_delete_apply=None, on_delete_prepare=None):
69
70 @asyncio.coroutine
71 def prepare_noop(*args, **kwargs):
72 pass
73
74 def apply_noop(*args, **kwargs):
75 pass
76
77 self.on_add_apply = on_add_apply
78 self.on_add_prepare = on_add_prepare
79 self.on_delete_apply = on_delete_apply
80 self.on_delete_prepare = on_delete_prepare
81
82 for f in ('on_add_apply', 'on_delete_apply'):
83 ref = getattr(self, f)
84 if ref is None:
85 setattr(self, f, apply_noop)
86 continue
87
88 if asyncio.iscoroutinefunction(ref):
89 raise ValueError('%s cannot be a coroutine' % (f,))
90
91 for f in ('on_add_prepare', 'on_delete_prepare'):
92 ref = getattr(self, f)
93 if ref is None:
94 setattr(self, f, prepare_noop)
95 continue
96
97 if not asyncio.iscoroutinefunction(ref):
98 raise ValueError("%s must be a coroutine" % f)
99
100
101 class ConfigAgentSubscriber(object):
102 XPATH = "C,/rw-config-agent:config-agent/account"
103
104 def __init__(self, dts, log, config_callbacks):
105 self._dts = dts
106 self._log = log
107 self._reg = None
108
109 self.accounts = {}
110
111 self._config_callbacks = config_callbacks
112
113 def add_account(self, account_msg):
114 self._log.info("adding config account: {}".format(account_msg))
115
116 self.accounts[account_msg.name] = account_msg
117
118 self._config_callbacks.on_add_apply(account_msg)
119
120 def delete_account(self, account_msg):
121 self._log.info("deleting config account: {}".format(account_msg.name))
122 del self.accounts[account_msg.name]
123
124 self._config_callbacks.on_delete_apply(account_msg)
125
126 def update_account(self, account_msg):
127 """ Update an existing config-agent account
128
129 In order to simplify update, turn an update into a delete followed by
130 an add. The drawback to this approach is that we will not support
131 updates of an "in-use" config-agent account, but this seems like a
132 reasonable trade-off.
133
134 Arguments:
135 account_msg - The config-agent account config message
136 """
137
138 self._log.info("updating config-agent account: {}".format(account_msg))
139 self.delete_account(account_msg)
140 self.add_account(account_msg)
141
142 def register(self):
143 def apply_config(dts, acg, xact, action, _):
144 self._log.debug("Got config account apply config (xact: %s) (action: %s)", xact, action)
145
146 if xact.xact is None:
147 # When RIFT first comes up, an INSTALL is called with the current config
148 # Since confd doesn't actally persist data this never has any data so
149 # skip this for now.
150 self._log.debug("No xact handle. Skipping apply config")
151 return
152
153 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
154 dts_member_reg=self._reg,
155 xact=xact,
156 key_name="name",
157 )
158
159 # Handle Deletes
160 for cfg in delete_cfgs:
161 self.delete_account(cfg)
162
163 # Handle Adds
164 for cfg in add_cfgs:
165 self.add_account(cfg)
166
167 # Handle Updates
168 for cfg in update_cfgs:
169 self.update_account(cfg)
170
171 @asyncio.coroutine
172 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
173 """ Prepare callback from DTS for Config Account """
174
175 action = xact_info.handle.query_action
176 self._log.debug("Config account on_prepare config received (action: %s): %s",
177 xact_info.handle.query_action, msg)
178
179 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
180 # If the account already exists, then this is an update.
181 if msg.name in self.accounts:
182 self._log.debug("Config account already exists. Invoking on_prepare update request")
183 if msg.has_field("account_type"):
184 raise ConfigAccountError("Cannot change config's account-type")
185
186 # Since updates are handled by a delete followed by an add, invoke the
187 # delete prepare callbacks to give clients an opportunity to reject.
188 yield from self._config_callbacks.on_delete_prepare(msg.name)
189
190 else:
191 self._log.debug("Config account does not already exist. Invoking on_prepare add request")
192 if not msg.has_field('account_type'):
193 raise ConfigAccountError("New Config account must contain account_type field.")
194
195 account = msg
196 yield from self._config_callbacks.on_add_prepare(account)
197
198 elif action == rwdts.QueryAction.DELETE:
199 # Check if the entire cloud account got deleted
200 fref = ProtobufC.FieldReference.alloc()
201 fref.goto_whole_message(msg.to_pbcm())
202 if fref.is_field_deleted():
203 yield from self._config_callbacks.on_delete_prepare(msg.name)
204 else:
205 self._log.error("Deleting individual fields for config account not supported")
206 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
207 return
208
209 else:
210 self._log.error("Action (%s) NOT SUPPORTED", action)
211 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
212
213 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
214
215 self._log.debug("Registering for Config Account config using xpath: %s",
216 ConfigAgentSubscriber.XPATH,
217 )
218
219 acg_handler = rift.tasklets.AppConfGroup.Handler(
220 on_apply=apply_config,
221 )
222
223 with self._dts.appconf_group_create(acg_handler) as acg:
224 self._reg = acg.register(
225 xpath=ConfigAgentSubscriber.XPATH,
226 flags=rwdts.Flag.SUBSCRIBER,
227 on_prepare=on_prepare,
228 )