Merge from OSM SO master
[osm/SO.git] / common / python / rift / mano / cloud / config.py
1
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import rw_peas
20
21 import gi
22 gi.require_version('RwDts', '1.0')
23 import rift.tasklets
24 from rift.mano.utils.project import get_add_delete_update_cfgs
25
26 from gi.repository import (
27 RwcalYang as rwcal,
28 RwDts as rwdts,
29 ProtobufC,
30 RwCloudYang,
31 )
32
33 from . import accounts
34
35 class CloudAccountNotFound(Exception):
36 pass
37
38
39 class CloudAccountError(Exception):
40 pass
41
42
43 class CloudAccountConfigCallbacks(object):
44 def __init__(self,
45 on_add_apply=None, on_add_prepare=None,
46 on_delete_apply=None, on_delete_prepare=None):
47
48 @asyncio.coroutine
49 def prepare_noop(*args, **kwargs):
50 pass
51
52 def apply_noop(*args, **kwargs):
53 pass
54
55 self.on_add_apply = on_add_apply
56 self.on_add_prepare = on_add_prepare
57 self.on_delete_apply = on_delete_apply
58 self.on_delete_prepare = on_delete_prepare
59
60 for f in ('on_add_apply', 'on_delete_apply'):
61 ref = getattr(self, f)
62 if ref is None:
63 setattr(self, f, apply_noop)
64 continue
65
66 if asyncio.iscoroutinefunction(ref):
67 raise ValueError('%s cannot be a coroutine' % (f,))
68
69 for f in ('on_add_prepare', 'on_delete_prepare'):
70 ref = getattr(self, f)
71 if ref is None:
72 setattr(self, f, prepare_noop)
73 continue
74
75 if not asyncio.iscoroutinefunction(ref):
76 raise ValueError("%s must be a coroutine" % f)
77
78
79 class CloudAccountConfigSubscriber(object):
80 XPATH = "C,/rw-cloud:cloud/rw-cloud:account"
81
82 def __init__(self, dts, log, rwlog_hdl, project, cloud_callbacks):
83 self._dts = dts
84 self._log = log
85 self._rwlog_hdl = rwlog_hdl
86 self._project = project
87 self._reg = None
88
89 self.accounts = {}
90
91 self._cloud_callbacks = cloud_callbacks
92
93 def add_account(self, account_msg):
94 self._log.info("adding cloud account: {}".format(account_msg))
95
96 account = accounts.CloudAccount(self._log, self._rwlog_hdl, account_msg)
97 self.accounts[account.name] = account
98
99 self._cloud_callbacks.on_add_apply(account)
100
101 def delete_account(self, account_name):
102 self._log.info("deleting cloud account: {}".format(account_name))
103 del self.accounts[account_name]
104
105 self._cloud_callbacks.on_delete_apply(account_name)
106
107 def update_account(self, account_msg):
108 """ Update an existing cloud account
109
110 In order to simplify update, turn an update into a delete followed by
111 an add. The drawback to this approach is that we will not support
112 updates of an "in-use" cloud account, but this seems like a
113 reasonable trade-off.
114
115
116 Arguments:
117 account_msg - The cloud account config message
118 """
119 self._log.info("updating cloud account: {}".format(account_msg))
120
121 self.delete_account(account_msg.name)
122 self.add_account(account_msg)
123
124 def deregister(self):
125 self._log.debug("Project {}: De-register cloud account handler".
126 format(self._project))
127 if self._reg:
128 self._reg.deregister()
129 self._reg = None
130
131 @asyncio.coroutine
132 def register(self):
133 @asyncio.coroutine
134 def apply_config(dts, acg, xact, action, scratch):
135 self._log.debug("Got cloud account apply config (xact: %s) (action: %s)", xact, action)
136
137 if xact.xact is None:
138 if action == rwdts.AppconfAction.INSTALL:
139 curr_cfg = self._reg.elements
140 for cfg in curr_cfg:
141 self._log.debug("Cloud account being re-added after restart.")
142 if not cfg.has_field('account_type'):
143 raise CloudAccountError("New cloud account must contain account_type field.")
144 self.add_account(cfg)
145 else:
146 # When RIFT first comes up, an INSTALL is called with the current config
147 # Since confd doesn't actally persist data this never has any data so
148 # skip this for now.
149 self._log.debug("No xact handle. Skipping apply config")
150
151 return
152
153 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
154 dts_member_reg=self._reg,
155 xact=xact,
156 key_name="name",
157 )
158
159 # Handle Deletes
160 for cfg in delete_cfgs:
161 self.delete_account(cfg.name)
162
163 # Handle Adds
164 for cfg in add_cfgs:
165 self.add_account(cfg)
166
167 # Handle Updates
168 for cfg in update_cfgs:
169 self.update_account(cfg)
170
171 @asyncio.coroutine
172 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
173 """ Prepare callback from DTS for Cloud Account """
174
175 action = xact_info.query_action
176
177 xpath = ks_path.to_xpath(RwCloudYang.get_schema())
178
179 self._log.debug("Cloud account on_prepare config received (action: %s): %s",
180 xact_info.query_action, msg)
181
182 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
183 if msg.name in self.accounts:
184 self._log.debug("Cloud account {} already exists. " \
185 "Invoking update request".format(msg.name))
186
187 # Since updates are handled by a delete followed by an add, invoke the
188 # delete prepare callbacks to give clients an opportunity to reject.
189 yield from self._cloud_callbacks.on_delete_prepare(msg.name)
190
191 else:
192 self._log.debug("Cloud account does not already exist. Invoking on_prepare add request")
193 if not msg.has_field('account_type'):
194 raise CloudAccountError("New cloud account must contain account_type field.")
195
196 account = accounts.CloudAccount(self._log, self._rwlog_hdl, msg)
197 yield from self._cloud_callbacks.on_add_prepare(account)
198
199 elif action == rwdts.QueryAction.DELETE:
200 # Check if the entire cloud account got deleted
201 fref = ProtobufC.FieldReference.alloc()
202 fref.goto_whole_message(msg.to_pbcm())
203 if fref.is_field_deleted():
204 yield from self._cloud_callbacks.on_delete_prepare(msg.name)
205
206 else:
207 fref.goto_proto_name(msg.to_pbcm(), "sdn_account")
208 if fref.is_field_deleted():
209 # SDN account disassociated from cloud account
210 account = self.accounts[msg.name]
211 dict_account = account.account_msg.as_dict()
212 del dict_account["sdn_account"]
213 account.cloud_account_msg(dict_account)
214 else:
215 self._log.error("Deleting individual fields for cloud account not supported")
216 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
217 return
218
219 else:
220 self._log.error("Action (%s) NOT SUPPORTED", action)
221 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
222
223 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
224
225 self._log.debug("Registering for Cloud Account config using xpath: %s",
226 CloudAccountConfigSubscriber.XPATH,
227 )
228
229 acg_handler = rift.tasklets.AppConfGroup.Handler(
230 on_apply=apply_config,
231 )
232
233 xpath = self._project.add_project(CloudAccountConfigSubscriber.XPATH)
234 with self._dts.appconf_group_create(acg_handler) as acg:
235 self._reg = acg.register(
236 xpath=xpath,
237 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
238 on_prepare=on_prepare,
239 )