New feature: Code changes for project support
[osm/SO.git] / rwlaunchpad / plugins / rwvns / rift / topmgr / rwtopmgr.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19
20 import gi
21 gi.require_version('RwDts', '1.0')
22 gi.require_version('RwcalYang', '1.0')
23 gi.require_version('RwTypes', '1.0')
24 gi.require_version('RwSdn', '1.0')
25 from gi.repository import (
26 RwDts as rwdts,
27 IetfNetworkYang,
28 IetfNetworkTopologyYang,
29 IetfL2TopologyYang,
30 RwTopologyYang,
31 RwsdnYang,
32 RwTypes
33 )
34
35 from gi.repository.RwTypes import RwStatus
36 import rw_peas
37 import rift.tasklets
38
39 class SdnGetPluginError(Exception):
40 """ Error while fetching SDN plugin """
41 pass
42
43
44 class SdnGetInterfaceError(Exception):
45 """ Error while fetching SDN interface"""
46 pass
47
48
49 class SdnAccountMgr(object):
50 """ Implements the interface to backend plugins to fetch topology """
51 def __init__(self, log, log_hdl, loop, project):
52 self._account = {}
53 self._log = log
54 self._log_hdl = log_hdl
55 self._loop = loop
56 self._project = project
57 self._sdn = {}
58
59 self._regh = None
60
61 self._status = RwsdnYang.SdnConnectionStatus(
62 status='unknown',
63 details="Connection status lookup not started"
64 )
65
66 self._validate_task = None
67
68 def set_sdn_account(self,account):
69 if (account.name in self._account):
70 self._log.error("SDN Account is already set")
71 else:
72 sdn_account = RwsdnYang.SDNAccount()
73 sdn_account.from_dict(account.as_dict())
74 sdn_account.name = account.name
75 self._account[account.name] = sdn_account
76 self._log.debug("Account set is %s , %s",type(self._account), self._account)
77 self.start_validate_credentials(self._loop, account.name)
78
79 def del_sdn_account(self, name):
80 self._log.debug("Account deleted is %s , %s", type(self._account), name)
81 del self._account[name]
82
83 def update_sdn_account(self,account):
84 self._log.debug("Account updated is %s , %s", type(self._account), account)
85 if account.name in self._account:
86 sdn_account = self._account[account.name]
87
88 sdn_account.from_dict(
89 account.as_dict(),
90 ignore_missing_keys=True,
91 )
92 self._account[account.name] = sdn_account
93 self.start_validate_credentials(self._loop, account.name)
94
95 def get_sdn_account(self, name):
96 """
97 Creates an object for class RwsdnYang.SdnAccount()
98 """
99 if (name in self._account):
100 return self._account[name]
101 else:
102 self._log.error("ERROR : SDN account is not configured")
103
104 def get_saved_sdn_accounts(self, name):
105 ''' Get SDN Account corresponding to passed name, or all saved accounts if name is None'''
106 saved_sdn_accounts = []
107
108 if name is None or name == "":
109 sdn_accounts = list(self._account.values())
110 saved_sdn_accounts.extend(sdn_accounts)
111 elif name in self._account:
112 account = self._account[name]
113 saved_sdn_accounts.append(account)
114 else:
115 errstr = "SDN account {} does not exist".format(name)
116 raise KeyError(errstr)
117
118 return saved_sdn_accounts
119
120 def get_sdn_plugin(self,name):
121 """
122 Loads rw.sdn plugin via libpeas
123 """
124 if (name in self._sdn):
125 return self._sdn[name]
126 account = self.get_sdn_account(name)
127 plugin_name = getattr(account, account.account_type).plugin_name
128 self._log.info("SDN plugin being created")
129 plugin = rw_peas.PeasPlugin(plugin_name, 'RwSdn-1.0')
130 engine, info, extension = plugin()
131
132 self._sdn[name] = plugin.get_interface("Topology")
133 try:
134 rc = self._sdn[name].init(self._log_hdl)
135 assert rc == RwStatus.SUCCESS
136 except:
137 self._log.error("ERROR:SDN plugin instantiation failed ")
138 else:
139 self._log.info("SDN plugin successfully instantiated")
140 return self._sdn[name]
141
142 @asyncio.coroutine
143 def validate_sdn_account_credentials(self, loop, name):
144 self._log.debug("Validating SDN Account credentials %s", name)
145 self._status = RwsdnYang.SDNAccount_ConnectionStatus(
146 status="validating",
147 details="SDN account connection validation in progress"
148 )
149
150 _sdnacct = self.get_sdn_account(name)
151 if (_sdnacct is None):
152 raise SdnGetPluginError
153 _sdnplugin = self.get_sdn_plugin(name)
154 if (_sdnplugin is None):
155 raise SdnGetInterfaceError
156
157 rwstatus, status = yield from loop.run_in_executor(
158 None,
159 _sdnplugin.validate_sdn_creds,
160 _sdnacct,
161 )
162
163 if rwstatus == RwTypes.RwStatus.SUCCESS:
164 self._status = RwsdnYang.SdnConnectionStatus.from_dict(status.as_dict())
165 else:
166 self._status = RwsdnYang.SdnConnectionStatus(
167 status="failure",
168 details="Error when calling CAL validate sdn creds"
169 )
170
171 self._log.info("Got sdn account validation response: %s", self._status)
172 _sdnacct.connection_status = self._status
173
174 def start_validate_credentials(self, loop, name):
175 if self._validate_task is not None:
176 self._validate_task.cancel()
177 self._validate_task = None
178
179 self._validate_task = asyncio.ensure_future(
180 self.validate_sdn_account_credentials(loop, name),
181 loop=loop
182 )
183
184
185 class NwtopDiscoveryDtsHandler(object):
186 """ Handles DTS interactions for the Discovered Topology registration """
187 DISC_XPATH = "D,/nd:network"
188
189 def __init__(self, dts, log, loop, project, acctmgr, nwdatastore):
190 self._dts = dts
191 self._log = log
192 self._loop = loop
193 self._project = project
194 self._acctmgr = acctmgr
195 self._nwdatastore = nwdatastore
196
197 self._regh = None
198
199 @property
200 def regh(self):
201 """ The registration handle associated with this Handler"""
202 return self._regh
203
204 def deregister(self):
205 self._log.debug("De-register Topology discovery handler for project {}".
206 format(self._project.name))
207 if self._regh:
208 self._regh.deregister()
209 self._regh = None
210
211 @asyncio.coroutine
212 def register(self):
213 """ Register for the Discovered Topology path """
214
215 @asyncio.coroutine
216 def on_ready(regh, status):
217 """ On_ready for Discovered Topology registration """
218 self._log.debug("PUB reg ready for Discovered Topology handler regn_hdl(%s) status %s",
219 regh, status)
220
221 @asyncio.coroutine
222 def on_prepare(xact_info, action, ks_path, msg):
223 """ prepare for Discovered Topology registration"""
224 self._log.debug(
225 "Got topology on_prepare callback (xact_info: %s, action: %s): %s",
226 xact_info, action, msg
227 )
228
229 if action == rwdts.QueryAction.READ:
230
231 for name in self._acctmgr._account:
232 _sdnacct = self._acctmgr.get_sdn_account(name)
233 if (_sdnacct is None):
234 raise SdnGetPluginError
235
236 _sdnplugin = self._acctmgr.get_sdn_plugin(name)
237 if (_sdnplugin is None):
238 raise SdnGetInterfaceError
239
240 rc, nwtop = _sdnplugin.get_network_list(_sdnacct)
241 #assert rc == RwStatus.SUCCESS
242 if rc != RwStatus.SUCCESS:
243 self._log.error("Fetching get network list for SDN Account %s failed", name)
244 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
245 return
246
247 self._log.debug("Topology: Retrieved network attributes ")
248 for nw in nwtop.network:
249 # Add SDN account name
250 nw.rw_network_attributes.sdn_account_name = name
251 nw.server_provided = False
252 nw.network_id = name + ':' + nw.network_id
253 self._log.debug("...Network id %s", nw.network_id)
254 nw_xpath = ("D,/nd:network[network-id=\'{}\']").format(nw.network_id)
255 xact_info.respond_xpath(rwdts.XactRspCode.MORE,
256 nw_xpath, nw)
257
258 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
259 #err = "%s action on discovered Topology not supported" % action
260 #raise NotImplementedError(err)
261
262 self._log.debug("Registering for discovered topology using xpath %s", NwtopDiscoveryDtsHandler.DISC_XPATH)
263
264 handler = rift.tasklets.DTS.RegistrationHandler(
265 on_ready=on_ready,
266 on_prepare=on_prepare,
267 )
268
269 yield from self._dts.register(
270 NwtopDiscoveryDtsHandler.DISC_XPATH,
271 flags=rwdts.Flag.PUBLISHER,
272 handler=handler
273 )
274
275
276 class NwtopStaticDtsHandler(object):
277 """ Handles DTS interactions for the Static Topology registration """
278 STATIC_XPATH = "C,/nd:network"
279
280 def __init__(self, dts, log, loop, project, acctmgr, nwdatastore):
281 self._dts = dts
282 self._log = log
283 self._loop = loop
284 self._project = project
285 self._acctmgr = acctmgr
286
287 self._regh = None
288 self.pending = {}
289 self._nwdatastore = nwdatastore
290
291 @property
292 def regh(self):
293 """ The registration handle associated with this Handler"""
294 return self._regh
295
296 def deregister(self):
297 self._log.debug("De-register Topology static handler for project {}".
298 format(self._project.name))
299 if self._regh:
300 self._regh.deregister()
301 self._regh = None
302
303 @asyncio.coroutine
304 def register(self):
305 """ Register for the Static Topology path """
306
307 @asyncio.coroutine
308 def prepare_nw_cfg(dts, acg, xact, xact_info, ksp, msg, scratch):
309 """Prepare for application configuration. Stash the pending
310 configuration object for subsequent transaction phases"""
311 self._log.debug("Prepare Network config received network id %s, msg %s",
312 msg.network_id, msg)
313 self.pending[xact.id] = msg
314 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
315
316 def apply_nw_config(dts, acg, xact, action, scratch):
317 """Apply the pending configuration object"""
318 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
319 self._log.debug("No xact handle. Skipping apply config")
320 return
321
322 if xact.id not in self.pending:
323 raise KeyError("No stashed configuration found with transaction id [{}]".format(xact.id))
324
325 try:
326 if action == rwdts.AppconfAction.INSTALL:
327 self._nwdatastore.create_network(self.pending[xact.id].network_id, self.pending[xact.id])
328 elif action == rwdts.AppconfAction.RECONCILE:
329 self._nwdatastore.update_network(self.pending[xact.id].network_id, self.pending[xact.id])
330 except:
331 raise
332
333 self._log.debug("Create network config done")
334 return RwTypes.RwStatus.SUCCESS
335
336 self._log.debug("Registering for static topology using xpath %s", NwtopStaticDtsHandler.STATIC_XPATH)
337 handler=rift.tasklets.AppConfGroup.Handler(
338 on_apply=apply_nw_config)
339
340 with self._dts.appconf_group_create(handler=handler) as acg:
341 self._regh = acg.register(xpath = NwtopStaticDtsHandler.STATIC_XPATH,
342 flags = rwdts.Flag.SUBSCRIBER,
343 on_prepare=prepare_nw_cfg)