RIFT OSM R1 Initial Submission
[osm/SO.git] / rwlaunchpad / plugins / rwvns / rift / tasklets / rwvnstasklet / rwvnstasklet.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import logging
20 import os
21 import sys
22
23 import gi
24 gi.require_version('RwVnsYang', '1.0')
25 gi.require_version('RwDts', '1.0')
26 from gi.repository import (
27 RwVnsYang,
28 RwSdnYang,
29 RwDts as rwdts,
30 RwTypes,
31 ProtobufC,
32 )
33
34 import rift.tasklets
35
36 from rift.vlmgr import (
37 VlrDtsHandler,
38 VldDtsHandler,
39 VirtualLinkRecord,
40 )
41
42 from rift.topmgr import (
43 NwtopStaticDtsHandler,
44 NwtopDiscoveryDtsHandler,
45 NwtopDataStore,
46 SdnAccountMgr,
47 )
48
49
50 class SdnInterfaceError(Exception):
51 """ SDN interface creation Error """
52 pass
53
54
55 class SdnPluginError(Exception):
56 """ SDN plugin creation Error """
57 pass
58
59
60 class VlRecordError(Exception):
61 """ Vlr Record creation Error """
62 pass
63
64
65 class VlRecordNotFound(Exception):
66 """ Vlr Record not found"""
67 pass
68
69 class SdnAccountError(Exception):
70 """ Error while creating/deleting/updating SDN Account"""
71 pass
72
73 class SdnAccountNotFound(Exception):
74 pass
75
76 class SDNAccountDtsOperdataHandler(object):
77 def __init__(self, dts, log, loop, parent):
78 self._dts = dts
79 self._log = log
80 self._loop = loop
81 self._parent = parent
82
83 def _register_show_status(self):
84 def get_xpath(sdn_name=None):
85 return "D,/rw-sdn:sdn-account{}/rw-sdn:connection-status".format(
86 "[name='%s']" % sdn_name if sdn_name is not None else ''
87 )
88
89 @asyncio.coroutine
90 def on_prepare(xact_info, action, ks_path, msg):
91 path_entry = RwSdnYang.SDNAccountConfig.schema().keyspec_to_entry(ks_path)
92 sdn_account_name = path_entry.key00.name
93 self._log.debug("Got show sdn connection status request: %s", ks_path.create_string())
94
95 try:
96 saved_accounts = self._parent._acctmgr.get_saved_sdn_accounts(sdn_account_name)
97 for account in saved_accounts:
98 sdn_acct = RwSdnYang.SDNAccountConfig()
99 sdn_acct.from_dict(account.as_dict())
100
101 self._log.debug("Responding to sdn connection status request: %s", sdn_acct.connection_status)
102 xact_info.respond_xpath(
103 rwdts.XactRspCode.MORE,
104 xpath=get_xpath(account.name),
105 msg=sdn_acct.connection_status,
106 )
107 except KeyError as e:
108 self._log.warning(str(e))
109 xact_info.respond_xpath(rwdts.XactRspCode.NA)
110 return
111
112 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
113
114 yield from self._dts.register(
115 xpath=get_xpath(),
116 handler=rift.tasklets.DTS.RegistrationHandler(
117 on_prepare=on_prepare),
118 flags=rwdts.Flag.PUBLISHER,
119 )
120
121 def _register_validate_rpc(self):
122 def get_xpath():
123 return "/rw-sdn:update-sdn-status"
124
125 @asyncio.coroutine
126 def on_prepare(xact_info, action, ks_path, msg):
127 if not msg.has_field("sdn_account"):
128 raise SdnAccountNotFound("SDN account name not provided")
129
130 sdn_account_name = msg.sdn_account
131 account = self._parent._acctmgr.get_sdn_account(sdn_account_name)
132 if account is None:
133 self._log.warning("SDN account %s does not exist", sdn_account_name)
134 xact_info.respond_xpath(rwdts.XactRspCode.NA)
135 return
136
137 self._parent._acctmgr.start_validate_credentials(self._loop, sdn_account_name)
138
139 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
140
141 yield from self._dts.register(
142 xpath=get_xpath(),
143 handler=rift.tasklets.DTS.RegistrationHandler(
144 on_prepare=on_prepare
145 ),
146 flags=rwdts.Flag.PUBLISHER,
147 )
148
149 @asyncio.coroutine
150 def register(self):
151 yield from self._register_show_status()
152 yield from self._register_validate_rpc()
153
154 class SDNAccountDtsHandler(object):
155 XPATH = "C,/rw-sdn:sdn-account"
156
157 def __init__(self, dts, log, parent):
158 self._dts = dts
159 self._log = log
160 self._parent = parent
161
162 self._sdn_account = {}
163
164 def _set_sdn_account(self, account):
165 self._log.info("Setting sdn account: {}".format(account))
166 if account.name in self._sdn_account:
167 self._log.error("SDN Account with name %s already exists. Ignoring config", account.name);
168 self._sdn_account[account.name] = account
169 self._parent._acctmgr.set_sdn_account(account)
170
171 def _del_sdn_account(self, account_name):
172 self._log.info("Deleting sdn account: {}".format(account_name))
173 del self._sdn_account[account_name]
174
175 self._parent._acctmgr.del_sdn_account(account_name)
176
177 def _update_sdn_account(self, account):
178 self._log.info("Updating sdn account: {}".format(account))
179 # No need to update locally saved sdn_account's updated fields, as they
180 # are not used anywhere. Call the parent's update callback.
181 self._parent._acctmgr.update_sdn_account(account)
182
183 @asyncio.coroutine
184 def register(self):
185 def apply_config(dts, acg, xact, action, _):
186 self._log.debug("Got sdn account apply config (xact: %s) (action: %s)", xact, action)
187 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
188 self._log.debug("No xact handle. Skipping apply config")
189 return RwTypes.RwStatus.SUCCESS
190
191 return RwTypes.RwStatus.SUCCESS
192
193 @asyncio.coroutine
194 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
195 """ Prepare callback from DTS for SDN Account config """
196
197 self._log.info("SDN Cloud account config received: %s", msg)
198
199 fref = ProtobufC.FieldReference.alloc()
200 fref.goto_whole_message(msg.to_pbcm())
201
202 if fref.is_field_deleted():
203 # Delete the sdn account record
204 self._del_sdn_account(msg.name)
205 else:
206 # If the account already exists, then this is an update.
207 if msg.name in self._sdn_account:
208 self._log.debug("SDN account already exists. Invoking on_prepare update request")
209 if msg.has_field("account_type"):
210 errmsg = "Cannot update SDN account's account-type."
211 self._log.error(errmsg)
212 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
213 SDNAccountDtsHandler.XPATH,
214 errmsg)
215 raise SdnAccountError(errmsg)
216
217 # Update the sdn account record
218 self._update_sdn_account(msg)
219 else:
220 self._log.debug("SDN account does not already exist. Invoking on_prepare add request")
221 if not msg.has_field('account_type'):
222 errmsg = "New SDN account must contain account-type field."
223 self._log.error(errmsg)
224 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
225 SDNAccountDtsHandler.XPATH,
226 errmsg)
227 raise SdnAccountError(errmsg)
228
229 # Set the sdn account record
230 self._set_sdn_account(msg)
231
232 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
233
234
235 self._log.debug("Registering for Sdn Account config using xpath: %s",
236 SDNAccountDtsHandler.XPATH,
237 )
238
239 acg_handler = rift.tasklets.AppConfGroup.Handler(
240 on_apply=apply_config,
241 )
242
243 with self._dts.appconf_group_create(acg_handler) as acg:
244 acg.register(
245 xpath=SDNAccountDtsHandler.XPATH,
246 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
247 on_prepare=on_prepare
248 )
249
250
251 class VnsManager(object):
252 """ The Virtual Network Service Manager """
253 def __init__(self, dts, log, log_hdl, loop):
254 self._dts = dts
255 self._log = log
256 self._log_hdl = log_hdl
257 self._loop = loop
258 self._vlr_handler = VlrDtsHandler(dts, log, loop, self)
259 self._vld_handler = VldDtsHandler(dts, log, loop, self)
260 self._sdn_handler = SDNAccountDtsHandler(dts,log,self)
261 self._sdn_opdata_handler = SDNAccountDtsOperdataHandler(dts,log, loop, self)
262 self._acctmgr = SdnAccountMgr(self._log, self._log_hdl, self._loop)
263 self._nwtopdata_store = NwtopDataStore(log)
264 self._nwtopdiscovery_handler = NwtopDiscoveryDtsHandler(dts, log, loop, self._acctmgr, self._nwtopdata_store)
265 self._nwtopstatic_handler = NwtopStaticDtsHandler(dts, log, loop, self._acctmgr, self._nwtopdata_store)
266 self._vlrs = {}
267
268 @asyncio.coroutine
269 def register_vlr_handler(self):
270 """ Register vlr DTS handler """
271 self._log.debug("Registering DTS VLR handler")
272 yield from self._vlr_handler.register()
273
274 @asyncio.coroutine
275 def register_vld_handler(self):
276 """ Register vlr DTS handler """
277 self._log.debug("Registering DTS VLD handler")
278 yield from self._vld_handler.register()
279
280 @asyncio.coroutine
281 def register_sdn_handler(self):
282 """ Register vlr DTS handler """
283 self._log.debug("Registering SDN Account config handler")
284 yield from self._sdn_handler.register()
285 yield from self._sdn_opdata_handler.register()
286
287 @asyncio.coroutine
288 def register_nwtopstatic_handler(self):
289 """ Register static NW topology DTS handler """
290 self._log.debug("Registering static DTS NW topology handler")
291 yield from self._nwtopstatic_handler.register()
292
293 @asyncio.coroutine
294 def register_nwtopdiscovery_handler(self):
295 """ Register discovery-based NW topology DTS handler """
296 self._log.debug("Registering discovery-based DTS NW topology handler")
297 yield from self._nwtopdiscovery_handler.register()
298
299 @asyncio.coroutine
300 def register(self):
301 """ Register all static DTS handlers"""
302 yield from self.register_sdn_handler()
303 yield from self.register_vlr_handler()
304 yield from self.register_vld_handler()
305 yield from self.register_nwtopstatic_handler()
306 # Not used for now
307 yield from self.register_nwtopdiscovery_handler()
308
309 def create_vlr(self, msg):
310 """ Create VLR """
311 if msg.id in self._vlrs:
312 err = "Vlr id %s already exists" % msg.id
313 self._log.error(err)
314 # raise VlRecordError(err)
315 return self._vlrs[msg.id]
316
317 self._log.info("Creating VirtualLinkRecord %s", msg.id)
318 self._vlrs[msg.id] = VirtualLinkRecord(self._dts,
319 self._log,
320 self._loop,
321 self,
322 msg,
323 msg.res_id
324 )
325 return self._vlrs[msg.id]
326
327 def get_vlr(self, vlr_id):
328 """ Get VLR by vlr id """
329 return self._vlrs[vlr_id]
330
331 @asyncio.coroutine
332 def delete_vlr(self, vlr_id, xact):
333 """ Delete VLR with the passed id"""
334 if vlr_id not in self._vlrs:
335 err = "Delete Failed - Vlr id %s not found" % vlr_id
336 self._log.error(err)
337 raise VlRecordNotFound(err)
338
339 self._log.info("Deleting virtual link id %s", vlr_id)
340 yield from self._vlrs[vlr_id].terminate(xact)
341 del self._vlrs[vlr_id]
342 self._log.info("Deleted virtual link id %s", vlr_id)
343
344 def find_vlr_by_vld_id(self, vld_id):
345 """ Find a VLR matching the VLD Id """
346 for vlr in self._vlrs.values():
347 if vlr.vld_id == vld_id:
348 return vlr
349 return None
350
351 @asyncio.coroutine
352 def run(self):
353 """ Run this VNSM instance """
354 self._log.debug("Run VNSManager - registering static DTS handlers")
355 yield from self.register()
356
357 def vld_in_use(self, vld_id):
358 """ Is this VLD in use """
359 return False
360
361 @asyncio.coroutine
362 def publish_vlr(self, xact, path, msg):
363 """ Publish a VLR """
364 self._log.debug("Publish vlr called with path %s, msg %s",
365 path, msg)
366 yield from self._vlr_handler.update(xact, path, msg)
367
368 @asyncio.coroutine
369 def unpublish_vlr(self, xact, path):
370 """ Publish a VLR """
371 self._log.debug("Unpublish vlr called with path %s", path)
372 yield from self._vlr_handler.delete(xact, path)
373
374
375 class VnsTasklet(rift.tasklets.Tasklet):
376 """ The VNS tasklet class """
377 def __init__(self, *args, **kwargs):
378 super(VnsTasklet, self).__init__(*args, **kwargs)
379 self.rwlog.set_category("rw-mano-log")
380 self.rwlog.set_subcategory("vns")
381
382 self._dts = None
383 self._vlr_handler = None
384
385 self._vnsm = None
386 # A mapping of instantiated vlr_id's to VirtualLinkRecord objects
387 self._vlrs = {}
388
389 def start(self):
390 super(VnsTasklet, self).start()
391 self.log.info("Starting VnsTasklet")
392
393 self.log.debug("Registering with dts")
394 self._dts = rift.tasklets.DTS(self.tasklet_info,
395 RwVnsYang.get_schema(),
396 self.loop,
397 self.on_dts_state_change)
398
399 self.log.debug("Created DTS Api GI Object: %s", self._dts)
400
401 def on_instance_started(self):
402 """ The task instance started callback"""
403 self.log.debug("Got instance started callback")
404
405 def stop(self):
406 try:
407 self._dts.deinit()
408 except Exception:
409 print("Caught Exception in VNS stop:", sys.exc_info()[0])
410 raise
411
412 @asyncio.coroutine
413 def init(self):
414 """ task init callback"""
415 self._vnsm = VnsManager(dts=self._dts,
416 log=self.log,
417 log_hdl=self.log_hdl,
418 loop=self.loop)
419 yield from self._vnsm.run()
420
421 # NSM needs to detect VLD deletion that has active VLR
422 # self._vld_handler = VldDescriptorConfigDtsHandler(
423 # self._dts, self.log, self.loop, self._vlrs,
424 # )
425 # yield from self._vld_handler.register()
426
427 @asyncio.coroutine
428 def run(self):
429 """ tasklet run callback """
430 pass
431
432 @asyncio.coroutine
433 def on_dts_state_change(self, state):
434 """Take action according to current dts state to transition
435 application into the corresponding application state
436
437 Arguments
438 state - current dts state
439 """
440 switch = {
441 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
442 rwdts.State.CONFIG: rwdts.State.RUN,
443 }
444
445 handlers = {
446 rwdts.State.INIT: self.init,
447 rwdts.State.RUN: self.run,
448 }
449
450 # Transition application to next state
451 handler = handlers.get(state, None)
452 if handler is not None:
453 yield from handler()
454
455 # Transition dts to next state
456 next_state = switch.get(state, None)
457 if next_state is not None:
458 self._dts.handle.set_state(next_state)