update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / common / python / rift / mano / config_agent / config.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import rw_peas
20
21 import gi
22 gi.require_version('RwDts', '1.0')
23 import rift.tasklets
24 from rift.mano.utils.project import get_add_delete_update_cfgs
25
26 from gi.repository import (
27 RwcalYang as rwcal,
28 RwDts as rwdts,
29 RwConfigAgentYang as rwcfg_agent,
30 ProtobufC,
31 )
32
33 class ConfigAccountNotFound(Exception):
34 pass
35
36 class ConfigAccountError(Exception):
37 pass
38
39
40 class ConfigAgentCallbacks(object):
41 def __init__(self,
42 on_add_apply=None, on_add_prepare=None,
43 on_delete_apply=None, on_delete_prepare=None):
44
45 @asyncio.coroutine
46 def prepare_noop(*args, **kwargs):
47 pass
48
49 def apply_noop(*args, **kwargs):
50 pass
51
52 self.on_add_apply = on_add_apply
53 self.on_add_prepare = on_add_prepare
54 self.on_delete_apply = on_delete_apply
55 self.on_delete_prepare = on_delete_prepare
56
57 for f in ('on_add_apply', 'on_delete_apply'):
58 ref = getattr(self, f)
59 if ref is None:
60 setattr(self, f, apply_noop)
61 continue
62
63 if asyncio.iscoroutinefunction(ref):
64 raise ValueError('%s cannot be a coroutine' % (f,))
65
66 for f in ('on_add_prepare', 'on_delete_prepare'):
67 ref = getattr(self, f)
68 if ref is None:
69 setattr(self, f, prepare_noop)
70 continue
71
72 if not asyncio.iscoroutinefunction(ref):
73 raise ValueError("%s must be a coroutine" % f)
74
75
76 class ConfigAgentSubscriber(object):
77 XPATH = "C,/rw-config-agent:config-agent/account"
78
79 def __init__(self, dts, log, project, config_callbacks):
80 self._dts = dts
81 self._log = log
82 self._project = project
83 self._reg = None
84
85 self.accounts = {}
86
87 self._config_callbacks = config_callbacks
88
89 def add_account(self, account_msg):
90 self._log.info("adding config account: {}".format(account_msg))
91
92 self.accounts[account_msg.name] = account_msg
93
94 self._config_callbacks.on_add_apply(account_msg)
95
96 def delete_account(self, account_msg):
97 self._log.info("deleting config account: {}".format(account_msg.name))
98 del self.accounts[account_msg.name]
99
100 self._config_callbacks.on_delete_apply(account_msg)
101
102 def update_account(self, account_msg):
103 """ Update an existing config-agent account
104
105 In order to simplify update, turn an update into a delete followed by
106 an add. The drawback to this approach is that we will not support
107 updates of an "in-use" config-agent account, but this seems like a
108 reasonable trade-off.
109
110 Arguments:
111 account_msg - The config-agent account config message
112 """
113
114 self._log.info("updating config-agent account: {}".format(account_msg))
115 self.delete_account(account_msg)
116 self.add_account(account_msg)
117
118 def deregister(self):
119 self._log.debug("De-register config agent handler for project {}".
120 format(self._project.name))
121 if self._reg:
122 self._reg.deregister()
123 self._reg = None
124
125 def register(self):
126 def apply_config(dts, acg, xact, action, _):
127 self._log.debug("Got config account apply config (xact: %s) (action: %s)", xact, action)
128
129 if xact.xact is None:
130 if action == rwdts.AppconfAction.INSTALL:
131 curr_cfg = self._reg.elements
132 for cfg in curr_cfg:
133 self._log.info("Config Agent Account {} being re-added after restart.".
134 format(cfg.name))
135 self.add_account(cfg)
136 else:
137 self._log.debug("No xact handle. Skipping apply config")
138
139 return
140
141 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
142 dts_member_reg=self._reg,
143 xact=xact,
144 key_name="name",
145 )
146
147 # Handle Deletes
148 for cfg in delete_cfgs:
149 self.delete_account(cfg)
150
151 # Handle Adds
152 for cfg in add_cfgs:
153 self.add_account(cfg)
154
155 # Handle Updates
156 for cfg in update_cfgs:
157 self.update_account(cfg)
158
159 @asyncio.coroutine
160 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
161 """ Prepare callback from DTS for Config Account """
162
163 action = xact_info.handle.query_action
164 self._log.debug("Config account on_prepare config received (action: %s): %s",
165 xact_info.handle.query_action, msg)
166
167 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
168 # If the account already exists, then this is an update.
169 if msg.name in self.accounts:
170 self._log.debug("Config account already exists. Invoking on_prepare update request")
171 if msg.has_field("account_type"):
172 raise ConfigAccountError("Cannot change config's account-type")
173
174 # Since updates are handled by a delete followed by an add, invoke the
175 # delete prepare callbacks to give clients an opportunity to reject.
176 yield from self._config_callbacks.on_delete_prepare(msg.name)
177
178 else:
179 self._log.debug("Config account does not already exist. Invoking on_prepare add request")
180 if not msg.has_field('account_type'):
181 raise ConfigAccountError("New Config account must contain account_type field.")
182
183 account = msg
184 yield from self._config_callbacks.on_add_prepare(account)
185
186 elif action == rwdts.QueryAction.DELETE:
187 # Check if the entire cloud account got deleted
188 fref = ProtobufC.FieldReference.alloc()
189 fref.goto_whole_message(msg.to_pbcm())
190 if fref.is_field_deleted():
191 yield from self._config_callbacks.on_delete_prepare(msg.name)
192 else:
193 self._log.error("Deleting individual fields for config account not supported")
194 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
195 return
196
197 else:
198 self._log.error("Action (%s) NOT SUPPORTED", action)
199 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
200
201 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
202
203
204 acg_handler = rift.tasklets.AppConfGroup.Handler(
205 on_apply=apply_config,
206 )
207
208 with self._dts.appconf_group_create(acg_handler) as acg:
209 xpath = self._project.add_project(ConfigAgentSubscriber.XPATH)
210 self._log.debug("Registering for Config Account config using xpath: %s",
211 xpath)
212 self._reg = acg.register(
213 xpath=xpath,
214 flags=rwdts.Flag.SUBSCRIBER,
215 on_prepare=on_prepare,
216 )