3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 gi
.require_version('RwDts', '1.0')
25 from gi
.repository
import (
31 from . import accounts
33 class CloudAccountNotFound(Exception):
37 class CloudAccountError(Exception):
41 def get_add_delete_update_cfgs(dts_member_reg
, xact
, key_name
):
42 # Unforunately, it is currently difficult to figure out what has exactly
43 # changed in this xact without Pbdelta support (RIFT-4916)
44 # As a workaround, we can fetch the pre and post xact elements and
45 # perform a comparison to figure out adds/deletes/updates
46 xact_cfgs
= list(dts_member_reg
.get_xact_elements(xact
))
47 curr_cfgs
= list(dts_member_reg
.elements
)
49 xact_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in xact_cfgs
}
50 curr_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in curr_cfgs
}
53 added_keys
= set(xact_key_map
) - set(curr_key_map
)
54 added_cfgs
= [xact_key_map
[key
] for key
in added_keys
]
57 deleted_keys
= set(curr_key_map
) - set(xact_key_map
)
58 deleted_cfgs
= [curr_key_map
[key
] for key
in deleted_keys
]
61 updated_keys
= set(curr_key_map
) & set(xact_key_map
)
62 updated_cfgs
= [xact_key_map
[key
] for key
in updated_keys
if xact_key_map
[key
] != curr_key_map
[key
]]
64 return added_cfgs
, deleted_cfgs
, updated_cfgs
67 class CloudAccountConfigCallbacks(object):
69 on_add_apply
=None, on_add_prepare
=None,
70 on_delete_apply
=None, on_delete_prepare
=None):
73 def prepare_noop(*args
, **kwargs
):
76 def apply_noop(*args
, **kwargs
):
79 self
.on_add_apply
= on_add_apply
80 self
.on_add_prepare
= on_add_prepare
81 self
.on_delete_apply
= on_delete_apply
82 self
.on_delete_prepare
= on_delete_prepare
84 for f
in ('on_add_apply', 'on_delete_apply'):
85 ref
= getattr(self
, f
)
87 setattr(self
, f
, apply_noop
)
90 if asyncio
.iscoroutinefunction(ref
):
91 raise ValueError('%s cannot be a coroutine' % (f
,))
93 for f
in ('on_add_prepare', 'on_delete_prepare'):
94 ref
= getattr(self
, f
)
96 setattr(self
, f
, prepare_noop
)
99 if not asyncio
.iscoroutinefunction(ref
):
100 raise ValueError("%s must be a coroutine" % f
)
103 class CloudAccountConfigSubscriber(object):
104 XPATH
= "C,/rw-cloud:cloud/rw-cloud:account"
106 def __init__(self
, dts
, log
, rwlog_hdl
, cloud_callbacks
):
109 self
._rwlog
_hdl
= rwlog_hdl
114 self
._cloud
_callbacks
= cloud_callbacks
116 def add_account(self
, account_msg
):
117 self
._log
.info("adding cloud account: {}".format(account_msg
))
119 account
= accounts
.CloudAccount(self
._log
, self
._rwlog
_hdl
, account_msg
)
120 self
.accounts
[account
.name
] = account
122 self
._cloud
_callbacks
.on_add_apply(account
)
124 def delete_account(self
, account_name
):
125 self
._log
.info("deleting cloud account: {}".format(account_name
))
126 del self
.accounts
[account_name
]
128 self
._cloud
_callbacks
.on_delete_apply(account_name
)
130 def update_account(self
, account_msg
):
131 """ Update an existing cloud account
133 In order to simplify update, turn an update into a delete followed by
134 an add. The drawback to this approach is that we will not support
135 updates of an "in-use" cloud account, but this seems like a
136 reasonable trade-off.
140 account_msg - The cloud account config message
142 self
._log
.info("updating cloud account: {}".format(account_msg
))
144 self
.delete_account(account_msg
.name
)
145 self
.add_account(account_msg
)
149 def apply_config(dts
, acg
, xact
, action
, _
):
150 self
._log
.debug("Got cloud account apply config (xact: %s) (action: %s)", xact
, action
)
152 if xact
.xact
is None:
153 if action
== rwdts
.AppconfAction
.INSTALL
:
154 curr_cfg
= self
._reg
.elements
156 self
._log
.debug("Cloud account being re-added after restart.")
157 if not cfg
.has_field('account_type'):
158 raise CloudAccountError("New cloud account must contain account_type field.")
159 self
.add_account(cfg
)
161 # When RIFT first comes up, an INSTALL is called with the current config
162 # Since confd doesn't actally persist data this never has any data so
164 self
._log
.debug("No xact handle. Skipping apply config")
168 add_cfgs
, delete_cfgs
, update_cfgs
= get_add_delete_update_cfgs(
169 dts_member_reg
=self
._reg
,
175 for cfg
in delete_cfgs
:
176 self
.delete_account(cfg
.name
)
180 self
.add_account(cfg
)
183 for cfg
in update_cfgs
:
184 self
.update_account(cfg
)
187 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
188 """ Prepare callback from DTS for Cloud Account """
190 action
= xact_info
.query_action
191 self
._log
.debug("Cloud account on_prepare config received (action: %s): %s",
192 xact_info
.query_action
, msg
)
194 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
195 if msg
.name
in self
.accounts
:
196 self
._log
.debug("Cloud account already exists. Invoking update request")
198 # Since updates are handled by a delete followed by an add, invoke the
199 # delete prepare callbacks to give clients an opportunity to reject.
200 yield from self
._cloud
_callbacks
.on_delete_prepare(msg
.name
)
203 self
._log
.debug("Cloud account does not already exist. Invoking on_prepare add request")
204 if not msg
.has_field('account_type'):
205 raise CloudAccountError("New cloud account must contain account_type field.")
207 account
= accounts
.CloudAccount(self
._log
, self
._rwlog
_hdl
, msg
)
208 yield from self
._cloud
_callbacks
.on_add_prepare(account
)
210 elif action
== rwdts
.QueryAction
.DELETE
:
211 # Check if the entire cloud account got deleted
212 fref
= ProtobufC
.FieldReference
.alloc()
213 fref
.goto_whole_message(msg
.to_pbcm())
214 if fref
.is_field_deleted():
215 yield from self
._cloud
_callbacks
.on_delete_prepare(msg
.name
)
218 fref
.goto_proto_name(msg
.to_pbcm(), "sdn_account")
219 if fref
.is_field_deleted():
220 # SDN account disassociated from cloud account
221 account
= self
.accounts
[msg
.name
]
222 dict_account
= account
.account_msg
.as_dict()
223 del dict_account
["sdn_account"]
224 account
.cloud_account_msg(dict_account
)
226 self
._log
.error("Deleting individual fields for cloud account not supported")
227 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
231 self
._log
.error("Action (%s) NOT SUPPORTED", action
)
232 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
234 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
236 self
._log
.debug("Registering for Cloud Account config using xpath: %s",
237 CloudAccountConfigSubscriber
.XPATH
,
240 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
241 on_apply
=apply_config
,
244 with self
._dts
.appconf_group_create(acg_handler
) as acg
:
245 self
._reg
= acg
.register(
246 xpath
=CloudAccountConfigSubscriber
.XPATH
,
247 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
248 on_prepare
=on_prepare
,