1720e3912e4abee33c82942eea4de82cc165bcbc
[osm/SO.git] / rwlaunchpad / plugins / rwvns / rift / tasklets / rwvnstasklet / rwvnstasklet.py
1
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import logging
20 import os
21 import sys
22
23 import gi
24 gi.require_version('RwVnsYang', '1.0')
25 gi.require_version('RwDts', '1.0')
26 from gi.repository import (
27 RwVnsYang,
28 RwSdnYang,
29 RwDts as rwdts,
30 RwTypes,
31 ProtobufC,
32 )
33
34 import rift.tasklets
35 from rift.mano.utils.project import (
36 ManoProject,
37 ProjectHandler,
38 )
39
40 from rift.vlmgr import (
41 VlrDtsHandler,
42 VldDtsHandler,
43 VirtualLinkRecord,
44 )
45
46 from rift.topmgr import (
47 NwtopStaticDtsHandler,
48 NwtopDiscoveryDtsHandler,
49 NwtopDataStore,
50 SdnAccountMgr,
51 )
52
53
54 class SdnInterfaceError(Exception):
55 """ SDN interface creation Error """
56 pass
57
58
59 class SdnPluginError(Exception):
60 """ SDN plugin creation Error """
61 pass
62
63
64 class VlRecordError(Exception):
65 """ Vlr Record creation Error """
66 pass
67
68
69 class VlRecordNotFound(Exception):
70 """ Vlr Record not found"""
71 pass
72
73 class SdnAccountError(Exception):
74 """ Error while creating/deleting/updating SDN Account"""
75 pass
76
77 class SdnAccountNotFound(Exception):
78 pass
79
80 class SDNAccountDtsOperdataHandler(object):
81 def __init__(self, dts, log, loop, parent):
82 self._dts = dts
83 self._log = log
84 self._loop = loop
85 self._parent = parent
86 self._project = self._parent._project
87 self._regh = None
88 self._rpch = None
89
90 def _register_show_status(self):
91 def get_xpath(sdn_name=None):
92 return self._project.add_project("D,/rw-sdn:sdn/rw-sdn:account{}" \
93 "/rw-sdn:connection-status".format(
94 "[name='%s']" % sdn_name if sdn_name is not None else ''
95 ))
96
97 @asyncio.coroutine
98 def on_prepare(xact_info, action, ks_path, msg):
99 path_entry = RwSdnYang.SDNAccountConfig.schema().keyspec_to_entry(ks_path)
100 sdn_account_name = path_entry.key00.name
101 self._log.debug("Got show sdn connection status request: %s", ks_path.create_string())
102
103 try:
104 saved_accounts = self._parent._acctmgr.get_saved_sdn_accounts(sdn_account_name)
105 for account in saved_accounts:
106 sdn_acct = RwSdnYang.SDNAccountConfig()
107 sdn_acct.from_dict(account.as_dict())
108
109 self._log.debug("Responding to sdn connection status request: %s", sdn_acct.connection_status)
110 xact_info.respond_xpath(
111 rwdts.XactRspCode.MORE,
112 xpath=get_xpath(account.name),
113 msg=sdn_acct.connection_status,
114 )
115 except KeyError as e:
116 self._log.warning(str(e))
117 xact_info.respond_xpath(rwdts.XactRspCode.NA)
118 return
119
120 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
121
122 self._regh = yield from self._dts.register(
123 xpath=get_xpath(),
124 handler=rift.tasklets.DTS.RegistrationHandler(
125 on_prepare=on_prepare),
126 flags=rwdts.Flag.PUBLISHER,
127 )
128
129 def _register_validate_rpc(self):
130 def get_xpath():
131 return "/rw-sdn:update-sdn-status"
132
133 @asyncio.coroutine
134 def on_prepare(xact_info, action, ks_path, msg):
135
136 if not self._project.rpc_check(msg, xact_info=xact_info):
137 return
138
139 if not msg.has_field("sdn_account"):
140 raise SdnAccountNotFound("SDN account name not provided")
141
142 sdn_account_name = msg.sdn_account
143 account = self._parent._acctmgr.get_sdn_account(sdn_account_name)
144 if account is None:
145 self._log.warning("SDN account %s does not exist", sdn_account_name)
146 xact_info.respond_xpath(rwdts.XactRspCode.NA)
147 return
148
149 self._parent._acctmgr.start_validate_credentials(self._loop, sdn_account_name)
150
151 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
152
153 self._rpch = yield from self._dts.register(
154 xpath=get_xpath(),
155 handler=rift.tasklets.DTS.RegistrationHandler(
156 on_prepare=on_prepare
157 ),
158 flags=rwdts.Flag.PUBLISHER,
159 )
160
161 @asyncio.coroutine
162 def register(self):
163 yield from self._register_show_status()
164 yield from self._register_validate_rpc()
165
166 def deregister(self):
167 self._log.debug("De-register SDN opdata handler for project {}".
168 format(self._project.name))
169 if self._regh:
170 self._regh.deregister()
171 self._regh = None
172
173 if self._rpch:
174 self._rpch.deregister()
175 self._rpch = None
176
177
178 class SDNAccountDtsHandler(object):
179 XPATH = "C,/rw-sdn:sdn/rw-sdn:account"
180
181 def __init__(self, dts, log, parent):
182 self._dts = dts
183 self._log = log
184 self._parent = parent
185 self._project = parent._project
186
187 self._sdn_account = {}
188 self._regh = None
189
190 @property
191 def _xpath(self):
192 return self._project.add_project(SDNAccountDtsHandler.XPATH)
193
194 def _set_sdn_account(self, account):
195 self._log.info("Setting sdn account: {}".format(account))
196 if account.name in self._sdn_account:
197 self._log.error("SDN Account with name %s already exists. Ignoring config", account.name);
198 self._sdn_account[account.name] = account
199 self._parent._acctmgr.set_sdn_account(account)
200
201 def _del_sdn_account(self, account_name):
202 self._log.info("Deleting sdn account: {}".format(account_name))
203 del self._sdn_account[account_name]
204
205 self._parent._acctmgr.del_sdn_account(account_name)
206
207 def _update_sdn_account(self, account):
208 self._log.info("Updating sdn account: {}".format(account))
209 # No need to update locally saved sdn_account's updated fields, as they
210 # are not used anywhere. Call the parent's update callback.
211 self._parent._acctmgr.update_sdn_account(account)
212
213 @asyncio.coroutine
214 def register(self):
215 def apply_config(dts, acg, xact, action, _):
216 self._log.debug("Got sdn account apply config (xact: %s) (action: %s)", xact, action)
217 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
218 self._log.debug("No xact handle. Skipping apply config")
219 return RwTypes.RwStatus.SUCCESS
220
221 return RwTypes.RwStatus.SUCCESS
222
223 @asyncio.coroutine
224 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
225 """ Prepare callback from DTS for SDN Account config """
226
227 self._log.info("SDN Cloud account config received: %s", msg)
228
229 fref = ProtobufC.FieldReference.alloc()
230 fref.goto_whole_message(msg.to_pbcm())
231
232 if fref.is_field_deleted():
233 # Delete the sdn account record
234 self._del_sdn_account(msg.name)
235 else:
236 # If the account already exists, then this is an update.
237 if msg.name in self._sdn_account:
238 self._log.debug("SDN account already exists. Invoking on_prepare update request")
239 if msg.has_field("account_type"):
240 errmsg = "Cannot update SDN account's account-type."
241 self._log.error(errmsg)
242 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
243 self._xpath,
244 errmsg)
245 raise SdnAccountError(errmsg)
246
247 # Update the sdn account record
248 self._update_sdn_account(msg)
249 else:
250 self._log.debug("SDN account does not already exist. Invoking on_prepare add request")
251 if not msg.has_field('account_type'):
252 errmsg = "New SDN account must contain account-type field."
253 self._log.error(errmsg)
254 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
255 self._xpath,
256 errmsg)
257 raise SdnAccountError(errmsg)
258
259 # Set the sdn account record
260 self._set_sdn_account(msg)
261
262 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
263
264
265 self._log.debug("Registering for Sdn Account config using xpath: {}".
266 format(self._xpath))
267
268 acg_handler = rift.tasklets.AppConfGroup.Handler(
269 on_apply=apply_config,
270 )
271
272 with self._dts.appconf_group_create(acg_handler) as acg:
273 self._regh = acg.register(
274 xpath=self._xpath,
275 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
276 on_prepare=on_prepare
277 )
278
279 def deregister(self):
280 self._log.debug("De-register VLR handler for project {}".
281 format(self._project.name))
282 if self._regh:
283 self._regh.deregister()
284 self._regh = None
285
286
287 class VnsManager(object):
288 """ The Virtual Network Service Manager """
289 def __init__(self, dts, log, log_hdl, loop, project):
290 self._dts = dts
291 self._log = log
292 self._log_hdl = log_hdl
293 self._loop = loop
294 self._project = project
295
296 self._vlr_handler = VlrDtsHandler(dts, log, loop, self)
297 self._vld_handler = VldDtsHandler(dts, log, loop, self)
298 self._sdn_handler = SDNAccountDtsHandler(dts,log,self)
299 self._sdn_opdata_handler = SDNAccountDtsOperdataHandler(dts,log, loop, self)
300 self._acctmgr = SdnAccountMgr(self._log, self._log_hdl, self._loop, self._project)
301 self._nwtopdata_store = NwtopDataStore(log)
302 self._nwtopdiscovery_handler = NwtopDiscoveryDtsHandler(dts, log, loop, self._project,
303 self._acctmgr, self._nwtopdata_store)
304 self._nwtopstatic_handler = NwtopStaticDtsHandler(dts, log, loop, self._project,
305 self._acctmgr, self._nwtopdata_store)
306 self._vlrs = {}
307
308 @asyncio.coroutine
309 def register_vlr_handler(self):
310 """ Register vlr DTS handler """
311 self._log.debug("Registering DTS VLR handler")
312 yield from self._vlr_handler.register()
313
314 @asyncio.coroutine
315 def register_vld_handler(self):
316 """ Register vlr DTS handler """
317 self._log.debug("Registering DTS VLD handler")
318 yield from self._vld_handler.register()
319
320 @asyncio.coroutine
321 def register_sdn_handler(self):
322 """ Register vlr DTS handler """
323 self._log.debug("Registering SDN Account config handler")
324 yield from self._sdn_handler.register()
325 yield from self._sdn_opdata_handler.register()
326
327 @asyncio.coroutine
328 def register_nwtopstatic_handler(self):
329 """ Register static NW topology DTS handler """
330 self._log.debug("Registering static DTS NW topology handler")
331 yield from self._nwtopstatic_handler.register()
332
333 @asyncio.coroutine
334 def register_nwtopdiscovery_handler(self):
335 """ Register discovery-based NW topology DTS handler """
336 self._log.debug("Registering discovery-based DTS NW topology handler")
337 yield from self._nwtopdiscovery_handler.register()
338
339 @asyncio.coroutine
340 def register(self):
341 """ Register all static DTS handlers"""
342 yield from self.register_sdn_handler()
343 yield from self.register_vlr_handler()
344 yield from self.register_vld_handler()
345 yield from self.register_nwtopstatic_handler()
346 # Not used for now
347 yield from self.register_nwtopdiscovery_handler()
348
349 def deregister(self):
350 self._nwtopdiscovery_handler.deregister()
351 self._nwtopstatic_handler.deregister()
352 self._vld_handler.deregister()
353 self._vlr_handler.deregister()
354 self._sdn_opdata_handler.deregister()
355 self._sdn_handler.deregister()
356
357 def create_vlr(self, msg):
358 """ Create VLR """
359 if msg.id in self._vlrs:
360 err = "Vlr id %s already exists" % msg.id
361 self._log.error(err)
362 # raise VlRecordError(err)
363 return self._vlrs[msg.id]
364
365 self._log.info("Creating VirtualLinkRecord %s", msg.id)
366 self._vlrs[msg.id] = VirtualLinkRecord(self._dts,
367 self._log,
368 self._loop,
369 self,
370 msg,
371 msg.res_id
372 )
373 return self._vlrs[msg.id]
374
375 def get_vlr(self, vlr_id):
376 """ Get VLR by vlr id """
377 return self._vlrs[vlr_id]
378
379 @asyncio.coroutine
380 def delete_vlr(self, vlr_id, xact):
381 """ Delete VLR with the passed id"""
382 if vlr_id not in self._vlrs:
383 err = "Delete Failed - Vlr id %s not found" % vlr_id
384 self._log.error(err)
385 raise VlRecordNotFound(err)
386
387 self._log.info("Deleting virtual link id %s", vlr_id)
388 yield from self._vlrs[vlr_id].terminate(xact)
389 del self._vlrs[vlr_id]
390 self._log.info("Deleted virtual link id %s", vlr_id)
391
392 def find_vlr_by_vld_id(self, vld_id):
393 """ Find a VLR matching the VLD Id """
394 for vlr in self._vlrs.values():
395 if vlr.vld_id == vld_id:
396 return vlr
397 return None
398
399 @asyncio.coroutine
400 def run(self):
401 """ Run this VNSM instance """
402 self._log.debug("Run VNSManager - registering static DTS handlers")
403 yield from self.register()
404
405 def vld_in_use(self, vld_id):
406 """ Is this VLD in use """
407 return False
408
409 @asyncio.coroutine
410 def publish_vlr(self, xact, xpath, msg):
411 """ Publish a VLR """
412 path = self._project.add_project(xpath)
413 self._log.debug("Publish vlr called with path %s, msg %s",
414 path, msg)
415 yield from self._vlr_handler.update(xact, path, msg)
416
417 @asyncio.coroutine
418 def unpublish_vlr(self, xact, xpath):
419 """ Publish a VLR """
420 path = self._project.add_project(xpath)
421 self._log.debug("Unpublish vlr called with path %s", path)
422 yield from self._vlr_handler.delete(xact, path)
423
424
425 class VnsProject(ManoProject):
426
427 def __init__(self, name, tasklet, **kw):
428 super(VnsProject, self).__init__(tasklet.log, name)
429 self.update(tasklet)
430
431 self._vlr_handler = None
432 self._vnsm = None
433 # A mapping of instantiated vlr_id's to VirtualLinkRecord objects
434 self._vlrs = {}
435
436 @asyncio.coroutine
437 def register (self):
438 self._vnsm = VnsManager(dts=self._dts,
439 log=self.log,
440 log_hdl=self._log_hdl,
441 loop=self._loop,
442 project=self)
443 yield from self._vnsm.run()
444
445 # NSM needs to detect VLD deletion that has active VLR
446 # self._vld_handler = VldDescriptorConfigDtsHandler(
447 # self._dts, self.log, self.loop, self._vlrs,
448 # )
449 # yield from self._vld_handler.register()
450
451 def deregister(self):
452 self._log.debug("De-register project {}".format(self.name))
453 self._vnsm.deregister()
454
455
456 class VnsTasklet(rift.tasklets.Tasklet):
457 """ The VNS tasklet class """
458 def __init__(self, *args, **kwargs):
459 super(VnsTasklet, self).__init__(*args, **kwargs)
460 self.rwlog.set_category("rw-mano-log")
461 self.rwlog.set_subcategory("vns")
462
463 self._dts = None
464 self._project_handler = None
465 self.projects = {}
466
467 @property
468 def dts(self):
469 return self._dts
470
471 def start(self):
472 super(VnsTasklet, self).start()
473 self.log.info("Starting VnsTasklet")
474
475 self.log.debug("Registering with dts")
476 self._dts = rift.tasklets.DTS(self.tasklet_info,
477 RwVnsYang.get_schema(),
478 self.loop,
479 self.on_dts_state_change)
480
481 self.log.debug("Created DTS Api GI Object: %s", self._dts)
482
483 def on_instance_started(self):
484 """ The task instance started callback"""
485 self.log.debug("Got instance started callback")
486
487 def stop(self):
488 try:
489 self._dts.deinit()
490 except Exception:
491 print("Caught Exception in VNS stop:", sys.exc_info()[0])
492 raise
493
494 @asyncio.coroutine
495 def init(self):
496 """ task init callback"""
497 self.log.debug("creating project handler")
498 self.project_handler = ProjectHandler(self, VnsProject)
499 self.project_handler.register()
500
501 @asyncio.coroutine
502 def run(self):
503 """ tasklet run callback """
504 pass
505
506 @asyncio.coroutine
507 def on_dts_state_change(self, state):
508 """Take action according to current dts state to transition
509 application into the corresponding application state
510
511 Arguments
512 state - current dts state
513 """
514 switch = {
515 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
516 rwdts.State.CONFIG: rwdts.State.RUN,
517 }
518
519 handlers = {
520 rwdts.State.INIT: self.init,
521 rwdts.State.RUN: self.run,
522 }
523
524 # Transition application to next state
525 handler = handlers.get(state, None)
526 if handler is not None:
527 yield from handler()
528
529 # Transition dts to next state
530 next_state = switch.get(state, None)
531 if next_state is not None:
532 self._dts.handle.set_state(next_state)