update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / common / python / rift / mano / cloud / config.py
1
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import rw_peas
20
21 import gi
22 gi.require_version('RwDts', '1.0')
23 import rift.tasklets
24 from rift.mano.utils.project import get_add_delete_update_cfgs
25
26 from gi.repository import (
27 RwcalYang as rwcal,
28 RwDts as rwdts,
29 ProtobufC,
30 RwCloudYang,
31 RwTypes
32 )
33
34 from . import accounts
35
36 class CloudAccountNotFound(Exception):
37 pass
38
39
40 class CloudAccountError(Exception):
41 pass
42
43
44 class CloudAccountConfigCallbacks(object):
45 def __init__(self,
46 on_add_apply=None, on_add_prepare=None,
47 on_delete_apply=None, on_delete_prepare=None):
48
49 @asyncio.coroutine
50 def prepare_noop(*args, **kwargs):
51 pass
52
53 def apply_noop(*args, **kwargs):
54 pass
55
56 self.on_add_apply = on_add_apply
57 self.on_add_prepare = on_add_prepare
58 self.on_delete_apply = on_delete_apply
59 self.on_delete_prepare = on_delete_prepare
60
61 for f in ('on_add_apply', 'on_delete_apply'):
62 ref = getattr(self, f)
63 if ref is None:
64 setattr(self, f, apply_noop)
65 continue
66
67 if asyncio.iscoroutinefunction(ref):
68 raise ValueError('%s cannot be a coroutine' % (f,))
69
70 for f in ('on_add_prepare', 'on_delete_prepare'):
71 ref = getattr(self, f)
72 if ref is None:
73 setattr(self, f, prepare_noop)
74 continue
75
76 if not asyncio.iscoroutinefunction(ref):
77 raise ValueError("%s must be a coroutine" % f)
78
79
80 class CloudAccountConfigSubscriber(object):
81 XPATH = "C,/rw-cloud:cloud/rw-cloud:account"
82
83 def __init__(self, dts, log, rwlog_hdl, project, cloud_callbacks):
84 self._dts = dts
85 self._log = log
86 self._rwlog_hdl = rwlog_hdl
87 self._project = project
88 self._reg = None
89
90 self.accounts = {}
91
92 self._cloud_callbacks = cloud_callbacks
93
94 def add_account(self, account_msg):
95 self._log.info("adding cloud account: {}".format(account_msg))
96
97 account = accounts.CloudAccount(self._log, self._rwlog_hdl, account_msg)
98 self.accounts[account.name] = account
99
100 self._cloud_callbacks.on_add_apply(account)
101
102 def delete_account(self, account_name):
103 self._log.info("deleting cloud account: {}".format(account_name))
104 del self.accounts[account_name]
105
106 self._cloud_callbacks.on_delete_apply(account_name)
107
108 def update_account(self, account_msg):
109 """ Update an existing cloud account
110
111 In order to simplify update, turn an update into a delete followed by
112 an add. The drawback to this approach is that we will not support
113 updates of an "in-use" cloud account, but this seems like a
114 reasonable trade-off.
115
116
117 Arguments:
118 account_msg - The cloud account config message
119 """
120 self._log.info("updating cloud account: {}".format(account_msg))
121
122 self.delete_account(account_msg.name)
123 self.add_account(account_msg)
124
125 def deregister(self):
126 self._log.debug("Project {}: De-register cloud account handler".
127 format(self._project))
128 if self._reg:
129 self._reg.deregister()
130 self._reg = None
131
132 @asyncio.coroutine
133 def register(self):
134 @asyncio.coroutine
135 def apply_config(dts, acg, xact, action, scratch):
136 self._log.debug("Got cloud account apply config (xact: %s) (action: %s)", xact, action)
137
138 if xact.xact is None:
139 if action == rwdts.AppconfAction.INSTALL:
140 curr_cfg = self._reg.elements
141 for cfg in curr_cfg:
142 self._log.debug("Cloud account being re-added after restart.")
143 if not cfg.has_field('account_type'):
144 self._log.error("New cloud account must contain account_type field.")
145 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
146 return
147 self.add_account(cfg)
148 else:
149 # When RIFT first comes up, an INSTALL is called with the current config
150 # Since confd doesn't actally persist data this never has any data so
151 # skip this for now.
152 self._log.debug("No xact handle. Skipping apply config")
153
154 return
155
156 #Updating the account incase individual fields of cloud accounts is being deleted.
157 if self._reg:
158 for cfg in self._reg.get_xact_elements(xact):
159 if cfg.name in scratch.get('cloud_accounts', []):
160 self.update_account(cfg)
161 scratch.pop('cloud_accounts', None)
162
163 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
164 dts_member_reg=self._reg,
165 xact=xact,
166 key_name="name",
167 )
168
169 # Handle Deletes
170 for cfg in delete_cfgs:
171 self.delete_account(cfg.name)
172
173 # Handle Adds
174 for cfg in add_cfgs:
175 self.add_account(cfg)
176
177 # Handle Updates
178 for cfg in update_cfgs:
179 self.update_account(cfg)
180
181 @asyncio.coroutine
182 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
183 """ Prepare callback from DTS for Cloud Account """
184
185 action = xact_info.query_action
186
187 xpath = ks_path.to_xpath(RwCloudYang.get_schema())
188
189 self._log.debug("Cloud account on_prepare config received (action: %s): %s",
190 xact_info.query_action, msg)
191
192 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
193 if msg.name in self.accounts:
194 self._log.debug("Cloud account {} already exists. " \
195 "Invoking update request".format(msg.name))
196
197 # Since updates are handled by a delete followed by an add, invoke the
198 # delete prepare callbacks to give clients an opportunity to reject.
199 yield from self._cloud_callbacks.on_delete_prepare(msg.name)
200
201 else:
202 self._log.debug("Cloud account does not already exist. Invoking on_prepare add request")
203 if not msg.has_field('account_type'):
204 self._log.error("New cloud account must contain account_type field.")
205 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
206 return
207
208 account = accounts.CloudAccount(self._log, self._rwlog_hdl, msg)
209 yield from self._cloud_callbacks.on_add_prepare(account)
210
211 elif action == rwdts.QueryAction.DELETE:
212 # Check if the entire cloud account got deleted
213 fref = ProtobufC.FieldReference.alloc()
214 fref.goto_whole_message(msg.to_pbcm())
215 if fref.is_field_deleted():
216 try:
217 yield from self._cloud_callbacks.on_delete_prepare(msg.name)
218 except Exception as e:
219 err_msg = str(e)
220 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE, xpath, err_msg)
221 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
222 return
223 else:
224 fref.goto_proto_name(msg.to_pbcm(), "sdn_account")
225 if fref.is_field_deleted():
226 # SDN account disassociated from cloud account
227 account = self.accounts[msg.name]
228 dict_account = account.account_msg.as_dict()
229 del dict_account["sdn_account"]
230 account.cloud_account_msg(dict_account)
231 else:
232 #Updating Account incase individuals fields are being deleted
233 cloud_accounts = scratch.setdefault('cloud_accounts', [])
234 cloud_accounts.append(msg.name)
235
236 else:
237 self._log.error("Action (%s) NOT SUPPORTED", action)
238 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
239
240 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
241
242 self._log.debug("Registering for Cloud Account config using xpath: %s",
243 CloudAccountConfigSubscriber.XPATH,
244 )
245
246 acg_handler = rift.tasklets.AppConfGroup.Handler(
247 on_apply=apply_config,
248 )
249
250 xpath = self._project.add_project(CloudAccountConfigSubscriber.XPATH)
251 with self._dts.appconf_group_create(acg_handler) as acg:
252 self._reg = acg.register(
253 xpath=xpath,
254 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
255 on_prepare=on_prepare,
256 )