New feature: Model changes for project support
[osm/SO.git] / rwlaunchpad / plugins / rwvns / rift / tasklets / rwvnstasklet / rwvnstasklet.py
1
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import logging
20 import os
21 import sys
22
23 import gi
24 gi.require_version('RwVnsYang', '1.0')
25 gi.require_version('RwDts', '1.0')
26 from gi.repository import (
27 RwVnsYang,
28 RwSdnYang,
29 RwDts as rwdts,
30 RwTypes,
31 ProtobufC,
32 )
33
34 import rift.tasklets
35
36 from rift.vlmgr import (
37 VlrDtsHandler,
38 VldDtsHandler,
39 VirtualLinkRecord,
40 )
41
42 from rift.topmgr import (
43 NwtopStaticDtsHandler,
44 NwtopDiscoveryDtsHandler,
45 NwtopDataStore,
46 SdnAccountMgr,
47 )
48
49
50 class SdnInterfaceError(Exception):
51 """ SDN interface creation Error """
52 pass
53
54
55 class SdnPluginError(Exception):
56 """ SDN plugin creation Error """
57 pass
58
59
60 class VlRecordError(Exception):
61 """ Vlr Record creation Error """
62 pass
63
64
65 class VlRecordNotFound(Exception):
66 """ Vlr Record not found"""
67 pass
68
69 class SdnAccountError(Exception):
70 """ Error while creating/deleting/updating SDN Account"""
71 pass
72
73 class SdnAccountNotFound(Exception):
74 pass
75
76 class SDNAccountDtsOperdataHandler(object):
77 def __init__(self, dts, log, loop, parent):
78 self._dts = dts
79 self._log = log
80 self._loop = loop
81 self._parent = parent
82
83 def _register_show_status(self):
84 def get_xpath(sdn_name=None):
85 return "D,/rw-project:project/rw-sdn:sdn/rw-sdn:account{}" \
86 "/rw-sdn:connection-status".format(
87 "[name='%s']" % sdn_name if sdn_name is not None else ''
88 )
89
90 @asyncio.coroutine
91 def on_prepare(xact_info, action, ks_path, msg):
92 path_entry = RwSdnYang.SDNAccountConfig.schema().keyspec_to_entry(ks_path)
93 sdn_account_name = path_entry.key00.name
94 self._log.debug("Got show sdn connection status request: %s", ks_path.create_string())
95
96 try:
97 saved_accounts = self._parent._acctmgr.get_saved_sdn_accounts(sdn_account_name)
98 for account in saved_accounts:
99 sdn_acct = RwSdnYang.SDNAccountConfig()
100 sdn_acct.from_dict(account.as_dict())
101
102 self._log.debug("Responding to sdn connection status request: %s", sdn_acct.connection_status)
103 xact_info.respond_xpath(
104 rwdts.XactRspCode.MORE,
105 xpath=get_xpath(account.name),
106 msg=sdn_acct.connection_status,
107 )
108 except KeyError as e:
109 self._log.warning(str(e))
110 xact_info.respond_xpath(rwdts.XactRspCode.NA)
111 return
112
113 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
114
115 yield from self._dts.register(
116 xpath=get_xpath(),
117 handler=rift.tasklets.DTS.RegistrationHandler(
118 on_prepare=on_prepare),
119 flags=rwdts.Flag.PUBLISHER,
120 )
121
122 def _register_validate_rpc(self):
123 def get_xpath():
124 return "/rw-sdn:update-sdn-status"
125
126 @asyncio.coroutine
127 def on_prepare(xact_info, action, ks_path, msg):
128 if not msg.has_field("sdn_account"):
129 raise SdnAccountNotFound("SDN account name not provided")
130
131 sdn_account_name = msg.sdn_account
132 account = self._parent._acctmgr.get_sdn_account(sdn_account_name)
133 if account is None:
134 self._log.warning("SDN account %s does not exist", sdn_account_name)
135 xact_info.respond_xpath(rwdts.XactRspCode.NA)
136 return
137
138 self._parent._acctmgr.start_validate_credentials(self._loop, sdn_account_name)
139
140 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
141
142 yield from self._dts.register(
143 xpath=get_xpath(),
144 handler=rift.tasklets.DTS.RegistrationHandler(
145 on_prepare=on_prepare
146 ),
147 flags=rwdts.Flag.PUBLISHER,
148 )
149
150 @asyncio.coroutine
151 def register(self):
152 yield from self._register_show_status()
153 yield from self._register_validate_rpc()
154
155 class SDNAccountDtsHandler(object):
156 XPATH = "C,/rw-project:project/rw-sdn:sdn/rw-sdn:account"
157
158 def __init__(self, dts, log, parent):
159 self._dts = dts
160 self._log = log
161 self._parent = parent
162
163 self._sdn_account = {}
164
165 def _set_sdn_account(self, account):
166 self._log.info("Setting sdn account: {}".format(account))
167 if account.name in self._sdn_account:
168 self._log.error("SDN Account with name %s already exists. Ignoring config", account.name);
169 self._sdn_account[account.name] = account
170 self._parent._acctmgr.set_sdn_account(account)
171
172 def _del_sdn_account(self, account_name):
173 self._log.info("Deleting sdn account: {}".format(account_name))
174 del self._sdn_account[account_name]
175
176 self._parent._acctmgr.del_sdn_account(account_name)
177
178 def _update_sdn_account(self, account):
179 self._log.info("Updating sdn account: {}".format(account))
180 # No need to update locally saved sdn_account's updated fields, as they
181 # are not used anywhere. Call the parent's update callback.
182 self._parent._acctmgr.update_sdn_account(account)
183
184 @asyncio.coroutine
185 def register(self):
186 def apply_config(dts, acg, xact, action, _):
187 self._log.debug("Got sdn account apply config (xact: %s) (action: %s)", xact, action)
188 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
189 self._log.debug("No xact handle. Skipping apply config")
190 return RwTypes.RwStatus.SUCCESS
191
192 return RwTypes.RwStatus.SUCCESS
193
194 @asyncio.coroutine
195 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
196 """ Prepare callback from DTS for SDN Account config """
197
198 self._log.info("SDN Cloud account config received: %s", msg)
199
200 fref = ProtobufC.FieldReference.alloc()
201 fref.goto_whole_message(msg.to_pbcm())
202
203 if fref.is_field_deleted():
204 # Delete the sdn account record
205 self._del_sdn_account(msg.name)
206 else:
207 # If the account already exists, then this is an update.
208 if msg.name in self._sdn_account:
209 self._log.debug("SDN account already exists. Invoking on_prepare update request")
210 if msg.has_field("account_type"):
211 errmsg = "Cannot update SDN account's account-type."
212 self._log.error(errmsg)
213 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
214 SDNAccountDtsHandler.XPATH,
215 errmsg)
216 raise SdnAccountError(errmsg)
217
218 # Update the sdn account record
219 self._update_sdn_account(msg)
220 else:
221 self._log.debug("SDN account does not already exist. Invoking on_prepare add request")
222 if not msg.has_field('account_type'):
223 errmsg = "New SDN account must contain account-type field."
224 self._log.error(errmsg)
225 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
226 SDNAccountDtsHandler.XPATH,
227 errmsg)
228 raise SdnAccountError(errmsg)
229
230 # Set the sdn account record
231 self._set_sdn_account(msg)
232
233 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
234
235
236 self._log.debug("Registering for Sdn Account config using xpath: %s",
237 SDNAccountDtsHandler.XPATH,
238 )
239
240 acg_handler = rift.tasklets.AppConfGroup.Handler(
241 on_apply=apply_config,
242 )
243
244 with self._dts.appconf_group_create(acg_handler) as acg:
245 acg.register(
246 xpath=SDNAccountDtsHandler.XPATH,
247 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
248 on_prepare=on_prepare
249 )
250
251
252 class VnsManager(object):
253 """ The Virtual Network Service Manager """
254 def __init__(self, dts, log, log_hdl, loop):
255 self._dts = dts
256 self._log = log
257 self._log_hdl = log_hdl
258 self._loop = loop
259 self._vlr_handler = VlrDtsHandler(dts, log, loop, self)
260 self._vld_handler = VldDtsHandler(dts, log, loop, self)
261 self._sdn_handler = SDNAccountDtsHandler(dts,log,self)
262 self._sdn_opdata_handler = SDNAccountDtsOperdataHandler(dts,log, loop, self)
263 self._acctmgr = SdnAccountMgr(self._log, self._log_hdl, self._loop)
264 self._nwtopdata_store = NwtopDataStore(log)
265 self._nwtopdiscovery_handler = NwtopDiscoveryDtsHandler(dts, log, loop, self._acctmgr, self._nwtopdata_store)
266 self._nwtopstatic_handler = NwtopStaticDtsHandler(dts, log, loop, self._acctmgr, self._nwtopdata_store)
267 self._vlrs = {}
268
269 @asyncio.coroutine
270 def register_vlr_handler(self):
271 """ Register vlr DTS handler """
272 self._log.debug("Registering DTS VLR handler")
273 yield from self._vlr_handler.register()
274
275 @asyncio.coroutine
276 def register_vld_handler(self):
277 """ Register vlr DTS handler """
278 self._log.debug("Registering DTS VLD handler")
279 yield from self._vld_handler.register()
280
281 @asyncio.coroutine
282 def register_sdn_handler(self):
283 """ Register vlr DTS handler """
284 self._log.debug("Registering SDN Account config handler")
285 yield from self._sdn_handler.register()
286 yield from self._sdn_opdata_handler.register()
287
288 @asyncio.coroutine
289 def register_nwtopstatic_handler(self):
290 """ Register static NW topology DTS handler """
291 self._log.debug("Registering static DTS NW topology handler")
292 yield from self._nwtopstatic_handler.register()
293
294 @asyncio.coroutine
295 def register_nwtopdiscovery_handler(self):
296 """ Register discovery-based NW topology DTS handler """
297 self._log.debug("Registering discovery-based DTS NW topology handler")
298 yield from self._nwtopdiscovery_handler.register()
299
300 @asyncio.coroutine
301 def register(self):
302 """ Register all static DTS handlers"""
303 yield from self.register_sdn_handler()
304 yield from self.register_vlr_handler()
305 yield from self.register_vld_handler()
306 yield from self.register_nwtopstatic_handler()
307 # Not used for now
308 yield from self.register_nwtopdiscovery_handler()
309
310 def create_vlr(self, msg):
311 """ Create VLR """
312 if msg.id in self._vlrs:
313 err = "Vlr id %s already exists" % msg.id
314 self._log.error(err)
315 # raise VlRecordError(err)
316 return self._vlrs[msg.id]
317
318 self._log.info("Creating VirtualLinkRecord %s", msg.id)
319 self._vlrs[msg.id] = VirtualLinkRecord(self._dts,
320 self._log,
321 self._loop,
322 self,
323 msg,
324 msg.res_id
325 )
326 return self._vlrs[msg.id]
327
328 def get_vlr(self, vlr_id):
329 """ Get VLR by vlr id """
330 return self._vlrs[vlr_id]
331
332 @asyncio.coroutine
333 def delete_vlr(self, vlr_id, xact):
334 """ Delete VLR with the passed id"""
335 if vlr_id not in self._vlrs:
336 err = "Delete Failed - Vlr id %s not found" % vlr_id
337 self._log.error(err)
338 raise VlRecordNotFound(err)
339
340 self._log.info("Deleting virtual link id %s", vlr_id)
341 yield from self._vlrs[vlr_id].terminate(xact)
342 del self._vlrs[vlr_id]
343 self._log.info("Deleted virtual link id %s", vlr_id)
344
345 def find_vlr_by_vld_id(self, vld_id):
346 """ Find a VLR matching the VLD Id """
347 for vlr in self._vlrs.values():
348 if vlr.vld_id == vld_id:
349 return vlr
350 return None
351
352 @asyncio.coroutine
353 def run(self):
354 """ Run this VNSM instance """
355 self._log.debug("Run VNSManager - registering static DTS handlers")
356 yield from self.register()
357
358 def vld_in_use(self, vld_id):
359 """ Is this VLD in use """
360 return False
361
362 @asyncio.coroutine
363 def publish_vlr(self, xact, path, msg):
364 """ Publish a VLR """
365 self._log.debug("Publish vlr called with path %s, msg %s",
366 path, msg)
367 yield from self._vlr_handler.update(xact, path, msg)
368
369 @asyncio.coroutine
370 def unpublish_vlr(self, xact, path):
371 """ Publish a VLR """
372 self._log.debug("Unpublish vlr called with path %s", path)
373 yield from self._vlr_handler.delete(xact, path)
374
375
376 class VnsTasklet(rift.tasklets.Tasklet):
377 """ The VNS tasklet class """
378 def __init__(self, *args, **kwargs):
379 super(VnsTasklet, self).__init__(*args, **kwargs)
380 self.rwlog.set_category("rw-mano-log")
381 self.rwlog.set_subcategory("vns")
382
383 self._dts = None
384 self._vlr_handler = None
385
386 self._vnsm = None
387 # A mapping of instantiated vlr_id's to VirtualLinkRecord objects
388 self._vlrs = {}
389
390 def start(self):
391 super(VnsTasklet, self).start()
392 self.log.info("Starting VnsTasklet")
393
394 self.log.debug("Registering with dts")
395 self._dts = rift.tasklets.DTS(self.tasklet_info,
396 RwVnsYang.get_schema(),
397 self.loop,
398 self.on_dts_state_change)
399
400 self.log.debug("Created DTS Api GI Object: %s", self._dts)
401
402 def on_instance_started(self):
403 """ The task instance started callback"""
404 self.log.debug("Got instance started callback")
405
406 def stop(self):
407 try:
408 self._dts.deinit()
409 except Exception:
410 print("Caught Exception in VNS stop:", sys.exc_info()[0])
411 raise
412
413 @asyncio.coroutine
414 def init(self):
415 """ task init callback"""
416 self._vnsm = VnsManager(dts=self._dts,
417 log=self.log,
418 log_hdl=self.log_hdl,
419 loop=self.loop)
420 yield from self._vnsm.run()
421
422 # NSM needs to detect VLD deletion that has active VLR
423 # self._vld_handler = VldDescriptorConfigDtsHandler(
424 # self._dts, self.log, self.loop, self._vlrs,
425 # )
426 # yield from self._vld_handler.register()
427
428 @asyncio.coroutine
429 def run(self):
430 """ tasklet run callback """
431 pass
432
433 @asyncio.coroutine
434 def on_dts_state_change(self, state):
435 """Take action according to current dts state to transition
436 application into the corresponding application state
437
438 Arguments
439 state - current dts state
440 """
441 switch = {
442 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
443 rwdts.State.CONFIG: rwdts.State.RUN,
444 }
445
446 handlers = {
447 rwdts.State.INIT: self.init,
448 rwdts.State.RUN: self.run,
449 }
450
451 # Transition application to next state
452 handler = handlers.get(state, None)
453 if handler is not None:
454 yield from handler()
455
456 # Transition dts to next state
457 next_state = switch.get(state, None)
458 if next_state is not None:
459 self._dts.handle.set_state(next_state)