Merge "CLI for OSM"
[osm/SO.git] / common / python / rift / mano / cloud / config.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import rw_peas
20
21 import gi
22 gi.require_version('RwDts', '1.0')
23 import rift.tasklets
24
25 from gi.repository import (
26 RwcalYang as rwcal,
27 RwDts as rwdts,
28 ProtobufC,
29 )
30
31 from . import accounts
32
33 class CloudAccountNotFound(Exception):
34 pass
35
36
37 class CloudAccountError(Exception):
38 pass
39
40
41 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
42 # Unforunately, it is currently difficult to figure out what has exactly
43 # changed in this xact without Pbdelta support (RIFT-4916)
44 # As a workaround, we can fetch the pre and post xact elements and
45 # perform a comparison to figure out adds/deletes/updates
46 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
47 curr_cfgs = list(dts_member_reg.elements)
48
49 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
50 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
51
52 # Find Adds
53 added_keys = set(xact_key_map) - set(curr_key_map)
54 added_cfgs = [xact_key_map[key] for key in added_keys]
55
56 # Find Deletes
57 deleted_keys = set(curr_key_map) - set(xact_key_map)
58 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
59
60 # Find Updates
61 updated_keys = set(curr_key_map) & set(xact_key_map)
62 updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
63
64 return added_cfgs, deleted_cfgs, updated_cfgs
65
66
67 class CloudAccountConfigCallbacks(object):
68 def __init__(self,
69 on_add_apply=None, on_add_prepare=None,
70 on_delete_apply=None, on_delete_prepare=None):
71
72 @asyncio.coroutine
73 def prepare_noop(*args, **kwargs):
74 pass
75
76 def apply_noop(*args, **kwargs):
77 pass
78
79 self.on_add_apply = on_add_apply
80 self.on_add_prepare = on_add_prepare
81 self.on_delete_apply = on_delete_apply
82 self.on_delete_prepare = on_delete_prepare
83
84 for f in ('on_add_apply', 'on_delete_apply'):
85 ref = getattr(self, f)
86 if ref is None:
87 setattr(self, f, apply_noop)
88 continue
89
90 if asyncio.iscoroutinefunction(ref):
91 raise ValueError('%s cannot be a coroutine' % (f,))
92
93 for f in ('on_add_prepare', 'on_delete_prepare'):
94 ref = getattr(self, f)
95 if ref is None:
96 setattr(self, f, prepare_noop)
97 continue
98
99 if not asyncio.iscoroutinefunction(ref):
100 raise ValueError("%s must be a coroutine" % f)
101
102
103 class CloudAccountConfigSubscriber(object):
104 XPATH = "C,/rw-cloud:cloud/rw-cloud:account"
105
106 def __init__(self, dts, log, rwlog_hdl, cloud_callbacks):
107 self._dts = dts
108 self._log = log
109 self._rwlog_hdl = rwlog_hdl
110 self._reg = None
111
112 self.accounts = {}
113
114 self._cloud_callbacks = cloud_callbacks
115
116 def add_account(self, account_msg):
117 self._log.info("adding cloud account: {}".format(account_msg))
118
119 account = accounts.CloudAccount(self._log, self._rwlog_hdl, account_msg)
120 self.accounts[account.name] = account
121
122 self._cloud_callbacks.on_add_apply(account)
123
124 def delete_account(self, account_name):
125 self._log.info("deleting cloud account: {}".format(account_name))
126 del self.accounts[account_name]
127
128 self._cloud_callbacks.on_delete_apply(account_name)
129
130 def update_account(self, account_msg):
131 """ Update an existing cloud account
132
133 In order to simplify update, turn an update into a delete followed by
134 an add. The drawback to this approach is that we will not support
135 updates of an "in-use" cloud account, but this seems like a
136 reasonable trade-off.
137
138
139 Arguments:
140 account_msg - The cloud account config message
141 """
142 self._log.info("updating cloud account: {}".format(account_msg))
143
144 self.delete_account(account_msg.name)
145 self.add_account(account_msg)
146
147 def register(self):
148 @asyncio.coroutine
149 def apply_config(dts, acg, xact, action, _):
150 self._log.debug("Got cloud account apply config (xact: %s) (action: %s)", xact, action)
151
152 if xact.xact is None:
153 if action == rwdts.AppconfAction.INSTALL:
154 curr_cfg = self._reg.elements
155 for cfg in curr_cfg:
156 self._log.debug("Cloud account being re-added after restart.")
157 if not cfg.has_field('account_type'):
158 raise CloudAccountError("New cloud account must contain account_type field.")
159 self.add_account(cfg)
160 else:
161 # When RIFT first comes up, an INSTALL is called with the current config
162 # Since confd doesn't actally persist data this never has any data so
163 # skip this for now.
164 self._log.debug("No xact handle. Skipping apply config")
165
166 return
167
168 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
169 dts_member_reg=self._reg,
170 xact=xact,
171 key_name="name",
172 )
173
174 # Handle Deletes
175 for cfg in delete_cfgs:
176 self.delete_account(cfg.name)
177
178 # Handle Adds
179 for cfg in add_cfgs:
180 self.add_account(cfg)
181
182 # Handle Updates
183 for cfg in update_cfgs:
184 self.update_account(cfg)
185
186 @asyncio.coroutine
187 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
188 """ Prepare callback from DTS for Cloud Account """
189
190 action = xact_info.query_action
191 self._log.debug("Cloud account on_prepare config received (action: %s): %s",
192 xact_info.query_action, msg)
193
194 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
195 if msg.name in self.accounts:
196 self._log.debug("Cloud account already exists. Invoking update request")
197
198 # Since updates are handled by a delete followed by an add, invoke the
199 # delete prepare callbacks to give clients an opportunity to reject.
200 yield from self._cloud_callbacks.on_delete_prepare(msg.name)
201
202 else:
203 self._log.debug("Cloud account does not already exist. Invoking on_prepare add request")
204 if not msg.has_field('account_type'):
205 raise CloudAccountError("New cloud account must contain account_type field.")
206
207 account = accounts.CloudAccount(self._log, self._rwlog_hdl, msg)
208 yield from self._cloud_callbacks.on_add_prepare(account)
209
210 elif action == rwdts.QueryAction.DELETE:
211 # Check if the entire cloud account got deleted
212 fref = ProtobufC.FieldReference.alloc()
213 fref.goto_whole_message(msg.to_pbcm())
214 if fref.is_field_deleted():
215 yield from self._cloud_callbacks.on_delete_prepare(msg.name)
216
217 else:
218 fref.goto_proto_name(msg.to_pbcm(), "sdn_account")
219 if fref.is_field_deleted():
220 # SDN account disassociated from cloud account
221 account = self.accounts[msg.name]
222 dict_account = account.account_msg.as_dict()
223 del dict_account["sdn_account"]
224 account.cloud_account_msg(dict_account)
225 else:
226 self._log.error("Deleting individual fields for cloud account not supported")
227 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
228 return
229
230 else:
231 self._log.error("Action (%s) NOT SUPPORTED", action)
232 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
233
234 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
235
236 self._log.debug("Registering for Cloud Account config using xpath: %s",
237 CloudAccountConfigSubscriber.XPATH,
238 )
239
240 acg_handler = rift.tasklets.AppConfGroup.Handler(
241 on_apply=apply_config,
242 )
243
244 with self._dts.appconf_group_create(acg_handler) as acg:
245 self._reg = acg.register(
246 xpath=CloudAccountConfigSubscriber.XPATH,
247 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
248 on_prepare=on_prepare,
249 )