RIFT OSM R1 Initial Submission
[osm/SO.git] / rwlaunchpad / plugins / rwvns / rift / topmgr / rwtopmgr.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19
20 import gi
21 gi.require_version('RwDts', '1.0')
22 gi.require_version('RwcalYang', '1.0')
23 gi.require_version('RwTypes', '1.0')
24 gi.require_version('RwSdn', '1.0')
25 from gi.repository import (
26 RwDts as rwdts,
27 IetfNetworkYang,
28 IetfNetworkTopologyYang,
29 IetfL2TopologyYang,
30 RwTopologyYang,
31 RwsdnYang,
32 RwTypes
33 )
34
35 from gi.repository.RwTypes import RwStatus
36 import rw_peas
37 import rift.tasklets
38
39 class SdnGetPluginError(Exception):
40 """ Error while fetching SDN plugin """
41 pass
42
43
44 class SdnGetInterfaceError(Exception):
45 """ Error while fetching SDN interface"""
46 pass
47
48
49 class SdnAccountMgr(object):
50 """ Implements the interface to backend plugins to fetch topology """
51 def __init__(self, log, log_hdl, loop):
52 self._account = {}
53 self._log = log
54 self._log_hdl = log_hdl
55 self._loop = loop
56 self._sdn = {}
57
58 self._regh = None
59
60 self._status = RwsdnYang.SDNAccount_ConnectionStatus(
61 status='unknown',
62 details="Connection status lookup not started"
63 )
64
65 self._validate_task = None
66
67 def set_sdn_account(self,account):
68 if (account.name in self._account):
69 self._log.error("SDN Account is already set")
70 else:
71 sdn_account = RwsdnYang.SDNAccount()
72 sdn_account.from_dict(account.as_dict())
73 sdn_account.name = account.name
74 self._account[account.name] = sdn_account
75 self._log.debug("Account set is %s , %s",type(self._account), self._account)
76 self.start_validate_credentials(self._loop, account.name)
77
78 def del_sdn_account(self, name):
79 self._log.debug("Account deleted is %s , %s", type(self._account), name)
80 del self._account[name]
81
82 def update_sdn_account(self,account):
83 self._log.debug("Account updated is %s , %s", type(self._account), account)
84 if account.name in self._account:
85 sdn_account = self._account[account.name]
86
87 sdn_account.from_dict(
88 account.as_dict(),
89 ignore_missing_keys=True,
90 )
91 self._account[account.name] = sdn_account
92 self.start_validate_credentials(self._loop, account.name)
93
94 def get_sdn_account(self, name):
95 """
96 Creates an object for class RwsdnYang.SdnAccount()
97 """
98 if (name in self._account):
99 return self._account[name]
100 else:
101 self._log.error("ERROR : SDN account is not configured")
102
103 def get_saved_sdn_accounts(self, name):
104 ''' Get SDN Account corresponding to passed name, or all saved accounts if name is None'''
105 saved_sdn_accounts = []
106
107 if name is None or name == "":
108 sdn_accounts = list(self._account.values())
109 saved_sdn_accounts.extend(sdn_accounts)
110 elif name in self._account:
111 account = self._account[name]
112 saved_sdn_accounts.append(account)
113 else:
114 errstr = "SDN account {} does not exist".format(name)
115 raise KeyError(errstr)
116
117 return saved_sdn_accounts
118
119 def get_sdn_plugin(self,name):
120 """
121 Loads rw.sdn plugin via libpeas
122 """
123 if (name in self._sdn):
124 return self._sdn[name]
125 account = self.get_sdn_account(name)
126 plugin_name = getattr(account, account.account_type).plugin_name
127 self._log.info("SDN plugin being created")
128 plugin = rw_peas.PeasPlugin(plugin_name, 'RwSdn-1.0')
129 engine, info, extension = plugin()
130
131 self._sdn[name] = plugin.get_interface("Topology")
132 try:
133 rc = self._sdn[name].init(self._log_hdl)
134 assert rc == RwStatus.SUCCESS
135 except:
136 self._log.error("ERROR:SDN plugin instantiation failed ")
137 else:
138 self._log.info("SDN plugin successfully instantiated")
139 return self._sdn[name]
140
141 @asyncio.coroutine
142 def validate_sdn_account_credentials(self, loop, name):
143 self._log.debug("Validating SDN Account credentials %s", name)
144 self._status = RwsdnYang.SDNAccount_ConnectionStatus(
145 status="validating",
146 details="SDN account connection validation in progress"
147 )
148
149 _sdnacct = self.get_sdn_account(name)
150 if (_sdnacct is None):
151 raise SdnGetPluginError
152 _sdnplugin = self.get_sdn_plugin(name)
153 if (_sdnplugin is None):
154 raise SdnGetInterfaceError
155
156 rwstatus, status = yield from loop.run_in_executor(
157 None,
158 _sdnplugin.validate_sdn_creds,
159 _sdnacct,
160 )
161
162 if rwstatus == RwTypes.RwStatus.SUCCESS:
163 self._status = RwsdnYang.SDNAccount_ConnectionStatus.from_dict(status.as_dict())
164 else:
165 self._status = RwsdnYang.SDNAccount_ConnectionStatus(
166 status="failure",
167 details="Error when calling CAL validate sdn creds"
168 )
169
170 self._log.info("Got sdn account validation response: %s", self._status)
171 _sdnacct.connection_status = self._status
172
173 def start_validate_credentials(self, loop, name):
174 if self._validate_task is not None:
175 self._validate_task.cancel()
176 self._validate_task = None
177
178 self._validate_task = asyncio.ensure_future(
179 self.validate_sdn_account_credentials(loop, name),
180 loop=loop
181 )
182
183
184 class NwtopDiscoveryDtsHandler(object):
185 """ Handles DTS interactions for the Discovered Topology registration """
186 DISC_XPATH = "D,/nd:network"
187
188 def __init__(self, dts, log, loop, acctmgr, nwdatastore):
189 self._dts = dts
190 self._log = log
191 self._loop = loop
192 self._acctmgr = acctmgr
193 self._nwdatastore = nwdatastore
194
195 self._regh = None
196
197 @property
198 def regh(self):
199 """ The registration handle associated with this Handler"""
200 return self._regh
201
202 @asyncio.coroutine
203 def register(self):
204 """ Register for the Discovered Topology path """
205
206 @asyncio.coroutine
207 def on_ready(regh, status):
208 """ On_ready for Discovered Topology registration """
209 self._log.debug("PUB reg ready for Discovered Topology handler regn_hdl(%s) status %s",
210 regh, status)
211
212 @asyncio.coroutine
213 def on_prepare(xact_info, action, ks_path, msg):
214 """ prepare for Discovered Topology registration"""
215 self._log.debug(
216 "Got topology on_prepare callback (xact_info: %s, action: %s): %s",
217 xact_info, action, msg
218 )
219
220 if action == rwdts.QueryAction.READ:
221
222 for name in self._acctmgr._account:
223 _sdnacct = self._acctmgr.get_sdn_account(name)
224 if (_sdnacct is None):
225 raise SdnGetPluginError
226
227 _sdnplugin = self._acctmgr.get_sdn_plugin(name)
228 if (_sdnplugin is None):
229 raise SdnGetInterfaceError
230
231 rc, nwtop = _sdnplugin.get_network_list(_sdnacct)
232 #assert rc == RwStatus.SUCCESS
233 if rc != RwStatus.SUCCESS:
234 self._log.error("Fetching get network list for SDN Account %s failed", name)
235 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
236 return
237
238 self._log.debug("Topology: Retrieved network attributes ")
239 for nw in nwtop.network:
240 # Add SDN account name
241 nw.rw_network_attributes.sdn_account_name = name
242 nw.server_provided = False
243 nw.network_id = name + ':' + nw.network_id
244 self._log.debug("...Network id %s", nw.network_id)
245 nw_xpath = ("D,/nd:network[network-id=\'{}\']").format(nw.network_id)
246 xact_info.respond_xpath(rwdts.XactRspCode.MORE,
247 nw_xpath, nw)
248
249 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
250 #err = "%s action on discovered Topology not supported" % action
251 #raise NotImplementedError(err)
252
253 self._log.debug("Registering for discovered topology using xpath %s", NwtopDiscoveryDtsHandler.DISC_XPATH)
254
255 handler = rift.tasklets.DTS.RegistrationHandler(
256 on_ready=on_ready,
257 on_prepare=on_prepare,
258 )
259
260 yield from self._dts.register(
261 NwtopDiscoveryDtsHandler.DISC_XPATH,
262 flags=rwdts.Flag.PUBLISHER,
263 handler=handler
264 )
265
266
267 class NwtopStaticDtsHandler(object):
268 """ Handles DTS interactions for the Static Topology registration """
269 STATIC_XPATH = "C,/nd:network"
270
271 def __init__(self, dts, log, loop, acctmgr, nwdatastore):
272 self._dts = dts
273 self._log = log
274 self._loop = loop
275 self._acctmgr = acctmgr
276
277 self._regh = None
278 self.pending = {}
279 self._nwdatastore = nwdatastore
280
281 @property
282 def regh(self):
283 """ The registration handle associated with this Handler"""
284 return self._regh
285
286
287 @asyncio.coroutine
288 def register(self):
289 """ Register for the Static Topology path """
290
291 @asyncio.coroutine
292 def prepare_nw_cfg(dts, acg, xact, xact_info, ksp, msg, scratch):
293 """Prepare for application configuration. Stash the pending
294 configuration object for subsequent transaction phases"""
295 self._log.debug("Prepare Network config received network id %s, msg %s",
296 msg.network_id, msg)
297 self.pending[xact.id] = msg
298 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
299
300 def apply_nw_config(dts, acg, xact, action, scratch):
301 """Apply the pending configuration object"""
302 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
303 self._log.debug("No xact handle. Skipping apply config")
304 return
305
306 if xact.id not in self.pending:
307 raise KeyError("No stashed configuration found with transaction id [{}]".format(xact.id))
308
309 try:
310 if action == rwdts.AppconfAction.INSTALL:
311 self._nwdatastore.create_network(self.pending[xact.id].network_id, self.pending[xact.id])
312 elif action == rwdts.AppconfAction.RECONCILE:
313 self._nwdatastore.update_network(self.pending[xact.id].network_id, self.pending[xact.id])
314 except:
315 raise
316
317 self._log.debug("Create network config done")
318 return RwTypes.RwStatus.SUCCESS
319
320 self._log.debug("Registering for static topology using xpath %s", NwtopStaticDtsHandler.STATIC_XPATH)
321 handler=rift.tasklets.AppConfGroup.Handler(
322 on_apply=apply_nw_config)
323
324 with self._dts.appconf_group_create(handler=handler) as acg:
325 acg.register(xpath = NwtopStaticDtsHandler.STATIC_XPATH,
326 flags = rwdts.Flag.SUBSCRIBER,
327 on_prepare=prepare_nw_cfg)
328
329