update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / common / python / rift / mano / sdn / config.py
1
2 #
3 # Copyright 2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19
20 import gi
21 gi.require_version('RwDts', '1.0')
22 import rift.tasklets
23
24 from gi.repository import (
25 RwDts as rwdts,
26 ProtobufC,
27 )
28
29 from rift.mano.utils.project import get_add_delete_update_cfgs
30
31 from . import accounts
32
33
34 class SDNAccountNotFound(Exception):
35 pass
36
37
38 class SDNAccountError(Exception):
39 pass
40
41
42 class SDNAccountConfigCallbacks(object):
43 def __init__(self,
44 on_add_apply=None, on_add_prepare=None,
45 on_delete_apply=None, on_delete_prepare=None):
46
47 @asyncio.coroutine
48 def prepare_noop(*args, **kwargs):
49 pass
50
51 def apply_noop(*args, **kwargs):
52 pass
53
54 self.on_add_apply = on_add_apply
55 self.on_add_prepare = on_add_prepare
56 self.on_delete_apply = on_delete_apply
57 self.on_delete_prepare = on_delete_prepare
58
59 for f in ('on_add_apply', 'on_delete_apply'):
60 ref = getattr(self, f)
61 if ref is None:
62 setattr(self, f, apply_noop)
63 continue
64
65 if asyncio.iscoroutinefunction(ref):
66 raise ValueError('%s cannot be a coroutine' % (f,))
67
68 for f in ('on_add_prepare', 'on_delete_prepare'):
69 ref = getattr(self, f)
70 if ref is None:
71 setattr(self, f, prepare_noop)
72 continue
73
74 if not asyncio.iscoroutinefunction(ref):
75 raise ValueError("%s must be a coroutine" % f)
76
77
78 class SDNAccountConfigSubscriber(object):
79 XPATH = "C,/rw-sdn:sdn/rw-sdn:account"
80
81 def __init__(self, dts, log, project, rwlog_hdl, sdn_callbacks, acctstore):
82 self._dts = dts
83 self._log = log
84 self._project = project
85 self._rwlog_hdl = rwlog_hdl
86 self._reg = None
87
88 self.accounts = acctstore
89
90 self._sdn_callbacks = sdn_callbacks
91
92 def add_account(self, account_msg):
93 self._log.info("adding sdn account: {}".format(account_msg))
94
95 account = accounts.SDNAccount(self._log, self._rwlog_hdl, account_msg)
96 self.accounts[account.name] = account
97
98 self._sdn_callbacks.on_add_apply(account)
99
100 def delete_account(self, account_name):
101 self._log.info("deleting sdn account: {}".format(account_name))
102 del self.accounts[account_name]
103
104 self._sdn_callbacks.on_delete_apply(account_name)
105
106 def update_account(self, account_msg):
107 """ Update an existing sdn account
108
109 In order to simplify update, turn an update into a delete followed by
110 an add. The drawback to this approach is that we will not support
111 updates of an "in-use" sdn account, but this seems like a
112 reasonable trade-off.
113
114
115 Arguments:
116 account_msg - The sdn account config message
117 """
118 self._log.info("updating sdn account: {}".format(account_msg))
119
120 self.delete_account(account_msg.name)
121 self.add_account(account_msg)
122
123 def deregister(self):
124 if self._reg:
125 self._reg.deregister()
126 self._reg = None
127
128 def register(self):
129 @asyncio.coroutine
130 def apply_config(dts, acg, xact, action, _):
131 self._log.debug("Got sdn account apply config (xact: %s) (action: %s)", xact, action)
132
133 if xact.xact is None:
134 if action == rwdts.AppconfAction.INSTALL:
135 curr_cfg = self._reg.elements
136 for cfg in curr_cfg:
137 self._log.debug("SDN account being re-added after restart.")
138 if not cfg.has_field('account_type'):
139 raise SDNAccountError("New SDN account must contain account_type field.")
140 self.add_account(cfg)
141 else:
142 # When RIFT first comes up, an INSTALL is called with the current config
143 # Since confd doesn't actally persist data this never has any data so
144 # skip this for now.
145 self._log.debug("No xact handle. Skipping apply config")
146
147 return
148
149 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
150 dts_member_reg=self._reg,
151 xact=xact,
152 key_name="name",
153 )
154
155 # Handle Deletes
156 for cfg in delete_cfgs:
157 self.delete_account(cfg.name)
158
159 # Handle Adds
160 for cfg in add_cfgs:
161 self.add_account(cfg)
162
163 # Handle Updates
164 for cfg in update_cfgs:
165 self.update_account(cfg)
166
167 @asyncio.coroutine
168 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
169 """ Prepare callback from DTS for SDN Account """
170
171 action = xact_info.query_action
172 self._log.debug("SDN account on_prepare config received (action: %s): %s",
173 xact_info.query_action, msg)
174
175 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
176 if msg.name in self.accounts:
177 self._log.debug("SDN account already exists. Invoking update request")
178
179 # Since updates are handled by a delete followed by an add, invoke the
180 # delete prepare callbacks to give clients an opportunity to reject.
181 yield from self._sdn_callbacks.on_delete_prepare(msg.name)
182
183 else:
184 self._log.debug("SDN account does not already exist. Invoking on_prepare add request")
185 if not msg.has_field('account_type'):
186 raise SDNAccountError("New sdn account must contain account_type field.")
187
188 account = accounts.SDNAccount(self._log, self._rwlog_hdl, msg)
189 yield from self._sdn_callbacks.on_add_prepare(account)
190
191 elif action == rwdts.QueryAction.DELETE:
192 # Check if the entire SDN account got deleted
193 fref = ProtobufC.FieldReference.alloc()
194 fref.goto_whole_message(msg.to_pbcm())
195 if fref.is_field_deleted():
196 yield from self._sdn_callbacks.on_delete_prepare(msg.name)
197
198 else:
199 self._log.error("Deleting individual fields for SDN account not supported")
200 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
201 return
202
203 else:
204 self._log.error("Action (%s) NOT SUPPORTED", action)
205 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
206
207 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
208
209 xpath = self._project.add_project(SDNAccountConfigSubscriber.XPATH)
210 self._log.debug("Registering for SDN Account config using xpath: %s",
211 xpath,
212 )
213
214 acg_handler = rift.tasklets.AppConfGroup.Handler(
215 on_apply=apply_config,
216 )
217
218 with self._dts.appconf_group_create(acg_handler) as acg:
219 self._reg = acg.register(
220 xpath=xpath,
221 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
222 on_prepare=on_prepare,
223 )