New feature: Code changes for project support
[osm/SO.git] / common / python / rift / mano / utils / project.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2017 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import abc
20 import asyncio
21 import logging
22
23 import gi
24 gi.require_version('RwProjectYang', '1.0')
25 gi.require_version('RwDtsYang', '1.0')
26 from gi.repository import (
27 RwProjectYang,
28 RwDts as rwdts,
29 ProtobufC,
30 )
31
32 import rift.tasklets
33
34
35 class ManoProjectError(Exception):
36 pass
37
38
39 class ManoProjNameSetErr(ManoProjectError):
40 pass
41
42
43 class ManoProjXpathNoProjErr(ManoProjectError):
44 pass
45
46
47 class ManoProjXpathKeyErr(ManoProjectError):
48 pass
49
50
51 class ManoProjXpathNotRootErr(ManoProjectError):
52 pass
53
54
55 class ManoProjXpathPresentErr(ManoProjectError):
56 pass
57
58
59 NS = 'rw-project'
60 PROJECT = 'project'
61 NS_PROJECT = '{}:{}'.format(NS, PROJECT)
62 XPATH = '/{}'.format(NS_PROJECT)
63 XPATH_LEN = len(XPATH)
64
65 NAME = 'name'
66 NAME_LEN = len(NAME)
67 NS_NAME = '{}:{}'.format(NS, NAME)
68
69 DEFAULT_PROJECT = 'default'
70 DEFAULT_PREFIX = "{}[{}='{}']".format(XPATH,
71 NS_NAME,
72 DEFAULT_PROJECT)
73
74
75 class ManoProject(object):
76 '''Class to handle the project name'''
77
78 log = None
79
80 @classmethod
81 def instance_from_xpath(cls, xpath, log):
82 name = cls.from_xpath(xpath, log)
83 if name is None:
84 return None
85
86 proj = ManoProject(log, name=name)
87 return proj
88
89 @classmethod
90 def from_xpath(cls, xpath, log):
91 log.debug("Get project name from {}".format(xpath));
92
93 if XPATH in xpath:
94 idx = xpath.find(XPATH)
95 if idx == -1:
96 msg = "Project not found in XPATH: {}".format(xpath)
97 log.error(msg)
98 raise ManoProjXpathNoProjErr(msg)
99
100 sub = xpath[idx+XPATH_LEN:].strip()
101 if (len(sub) < NAME_LEN) or (sub[0] != '['):
102 msg = "Project name not found in XPath: {}".format(xpath)
103 log.error(msg)
104 raise ManoProjXpathKeyErr(msg)
105
106 sub = sub[1:].strip()
107 idx = sub.find(NS_NAME)
108 if idx == -1:
109 idx = sub.find(NAME)
110 if idx != 0:
111 msg = "Project name not found in XPath: {}".format(xpath)
112 log.error(msg)
113 raise ManoProjXpathKeyErr(msg)
114
115 idx = sub.find(']')
116 if idx == -1:
117 msg = "XPath is invalid: {}".format(xpath)
118 log.error(msg)
119 raise ManoProjXpathKeyErr(msg)
120
121 sub = sub[:idx].strip()
122 try:
123 log.debug("Key and value found: {}".format(sub))
124 k, n = sub.split("=", 2)
125 name = n.strip(' \'"')
126 if name is None:
127 msg = "Project name is empty in XPath".format(xpath)
128 log.error(msg)
129 raise ManoProjXpathKeyErr (msg)
130
131 log.debug("Found project name {} from XPath {}".
132 format(name, xpath))
133 return name
134
135 except ValueError as e:
136 msg = "Project name not found in XPath: {}, exception: {}" \
137 .format(xpath, e)
138 log.exception(msg)
139 raise ManoProjXpathKeyErr(msg)
140 else:
141 msg = "Project not found in XPATH: {}".format(xpath)
142 log.error(msg)
143 raise ManoProjXpathNoProjErr(msg)
144
145 @classmethod
146 def get_log(cls):
147 if not cls.log:
148 cls.log = logging.getLogger('rw-mano-log.rw-project')
149 cls.log.setLevel(logging.ERROR)
150
151 @classmethod
152 def prefix_project(cls, xpath, project=None, log=None):
153 if log is None:
154 log = cls.get_log()
155
156 if project is None:
157 project = DEFAULT_PROJECT
158 proj_prefix = DEFAULT_PREFIX
159 else:
160 proj_prefix = "{}[{}='{}']".format(XPATH,
161 NS_NAME,
162 project)
163
164 log.debug("Add project {} to {}".format(project, xpath))
165
166 prefix = ''
167 suffix = xpath
168 idx = xpath.find('C,/')
169 if idx == -1:
170 idx = xpath.find('D,/')
171
172 suffix = xpath
173 if idx != -1:
174 prefix = xpath[:2]
175 suffix = xpath[2:]
176
177 if suffix[0] != '/':
178 msg = "Non-rooted xpath provided: {}".format(xpath)
179 log.error(msg)
180 raise ManoProjXpathNotRootErr(msg)
181
182 idx = suffix.find(XPATH)
183 if idx == 0:
184 name = cls.from_xpath(xpath, log)
185 if name == project:
186 log.warning("Project already in the XPATH: {}".format(xpath))
187 return xpath
188
189 else:
190 msg = "Different project {} already in XPATH {}". \
191 format(name, xpath)
192 log.error(msg)
193 raise ManoProjXpathPresentErr(msg)
194
195 ret = prefix + proj_prefix + suffix
196 log.debug("XPath with project: {}".format(ret))
197 return ret
198
199
200 def __init__(self, log, name=None, tasklet=None):
201 self._log = log
202 self._name = None
203 self._prefix = None
204 self._pbcm = None
205 self._tasklet = None
206 self._dts = None
207 self._loop = None
208 self._log_hdl = None
209
210 # Track if the apply config was received
211 self._apply = False
212
213 if name:
214 self.name = name
215
216 def update(self, tasklet):
217 # Store the commonly used properties from a tasklet
218 self._tasklet = tasklet
219 self._log_hdl = tasklet.log_hdl
220 self._dts = tasklet.dts
221 self._loop = tasklet.loop
222
223 @property
224 def name(self):
225 return self._name
226
227 @property
228 def log(self):
229 return self._log
230
231 @property
232 def prefix(self):
233 return self._prefix
234
235 @property
236 def pbcm(self):
237 return self._pbcm
238
239 @property
240 def config(self):
241 return self._pbcm.project_config
242
243 @property
244 def tasklet(self):
245 return self._tasklet
246
247 @property
248 def log_hdl(self):
249 return self._log_hdl
250
251 @property
252 def dts(self):
253 return self._dts
254
255 @property
256 def loop(self):
257 return self._loop
258
259 @name.setter
260 def name(self, value):
261 if self._name is None:
262 self._name = value
263 self._prefix = "{}[{}='{}']".format(XPATH,
264 NS_NAME,
265 self._name)
266 self._pbcm = RwProjectYang.YangData_RwProject_Project(
267 name=self._name)
268
269 elif self._name == value:
270 self._log.debug("Setting the same name again for project {}".
271 format(value))
272 else:
273 msg = "Project name already set to {}".format(self._name)
274 self._log.error(msg)
275 raise ManoProjNameSetErr(msg)
276
277 def set_from_xpath(self, xpath):
278 self.name = ManoProject.from_xpath(xpath, self._log)
279
280 def add_project(self, xpath):
281 return ManoProject.prefix_project(xpath, log=self._log, project=self._name)
282
283 @abc.abstractmethod
284 @asyncio.coroutine
285 def delete_prepare(self):
286 self._log.debug("Delete prepare for project {}".format(self._name))
287 return True
288
289 @abc.abstractmethod
290 @asyncio.coroutine
291 def register(self):
292 msg = "Register not implemented for project type {}". \
293 format(self.__class__.__name__)
294 self._log.error(msg)
295 raise NotImplementedError(msg)
296
297 @abc.abstractmethod
298 def deregister(self):
299 msg = "De-register not implemented for project type {}". \
300 format(self.__class__.__name__)
301 self._log.error(msg)
302 raise NotImplementedError(msg)
303
304 def rpc_check(self, msg, xact_info=None):
305 '''Check if the rpc is for this project'''
306 try:
307 project = msg.project_name
308 except AttributeError as e:
309 project = DEFAULT_PROJECT
310
311 if project != self.name:
312 self._log.debug("Project {}: RPC is for different project {}".
313 format(self.name, project))
314 if xact_info:
315 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
316 return False
317
318 return True
319
320 @asyncio.coroutine
321 def create_project(self, dts):
322 proj_xpath = "C,{}/project-config".format(self.prefix)
323 self._log.info("Creating project: {} with {}".
324 format(proj_xpath, self.config.as_dict()))
325
326 yield from dts.query_create(proj_xpath,
327 rwdts.XactFlag.ADVISE,
328 self.config)
329
330
331 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
332 #TODO: Check why this is getting called during project delete
333 if not dts_member_reg:
334 return [], [], []
335
336 # Unforunately, it is currently difficult to figure out what has exactly
337 # changed in this xact without Pbdelta support (RIFT-4916)
338 # As a workaround, we can fetch the pre and post xact elements and
339 # perform a comparison to figure out adds/deletes/updates
340 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
341 curr_cfgs = list(dts_member_reg.elements)
342
343 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
344 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
345
346 # Find Adds
347 added_keys = set(xact_key_map) - set(curr_key_map)
348 added_cfgs = [xact_key_map[key] for key in added_keys]
349
350 # Find Deletes
351 deleted_keys = set(curr_key_map) - set(xact_key_map)
352 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
353
354 # Find Updates
355 updated_keys = set(curr_key_map) & set(xact_key_map)
356 updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
357
358 return added_cfgs, deleted_cfgs, updated_cfgs
359
360
361 class ProjectConfigCallbacks(object):
362 def __init__(self,
363 on_add_apply=None, on_add_prepare=None,
364 on_delete_apply=None, on_delete_prepare=None):
365
366 @asyncio.coroutine
367 def prepare_noop(*args, **kwargs):
368 pass
369
370 def apply_noop(*args, **kwargs):
371 pass
372
373 self.on_add_apply = on_add_apply
374 self.on_add_prepare = on_add_prepare
375 self.on_delete_apply = on_delete_apply
376 self.on_delete_prepare = on_delete_prepare
377
378 for f in ('on_add_apply', 'on_delete_apply'):
379 ref = getattr(self, f)
380 if ref is None:
381 setattr(self, f, apply_noop)
382 continue
383
384 if asyncio.iscoroutinefunction(ref):
385 raise ValueError('%s cannot be a coroutine' % (f,))
386
387 for f in ('on_add_prepare', 'on_delete_prepare'):
388 ref = getattr(self, f)
389 if ref is None:
390 setattr(self, f, prepare_noop)
391 continue
392
393 if not asyncio.iscoroutinefunction(ref):
394 raise ValueError("%s must be a coroutine" % f)
395
396
397 class ProjectDtsHandler(object):
398 XPATH = "C,{}/project-config".format(XPATH)
399
400 def __init__(self, dts, log, callbacks):
401 self._dts = dts
402 self._log = log
403 self._callbacks = callbacks
404
405 self.reg = None
406 self.projects = []
407
408 @property
409 def log(self):
410 return self._log
411
412 @property
413 def dts(self):
414 return self._dts
415
416 def add_project(self, name):
417 self.log.info("Adding project: {}".format(name))
418
419 if name not in self.projects:
420 self._callbacks.on_add_apply(name)
421 self.projects.append(name)
422 else:
423 self.log.error("Project already present: {}".
424 format(name))
425
426 def delete_project(self, name):
427 self._log.info("Deleting project: {}".format(name))
428 if name in self.projects:
429 self._callbacks.on_delete_apply(name)
430 self.projects.remove(name)
431 else:
432 self.log.error("Unrecognized project: {}".
433 format(name))
434
435 def update_project(self, name):
436 """ Update an existing project
437
438 Currently, we do not take any action on MANO for this,
439 so no callbacks are defined
440
441 Arguments:
442 msg - The project config message
443 """
444 self._log.info("Updating project: {}".format(name))
445 if name in self.projects:
446 pass
447 else:
448 self.log.error("Unrecognized project: {}".
449 format(name))
450
451 def register(self):
452 @asyncio.coroutine
453 def apply_config(dts, acg, xact, action, scratch):
454 self._log.debug("Got project apply config (xact: %s) (action: %s)", xact, action)
455
456 if xact.xact is None:
457 if action == rwdts.AppconfAction.INSTALL:
458 curr_cfg = self._reg.elements
459 for cfg in curr_cfg:
460 self._log.debug("Project being re-added after restart.")
461 self.add_project(cfg)
462 else:
463 # When RIFT first comes up, an INSTALL is called with the current config
464 # Since confd doesn't actally persist data this never has any data so
465 # skip this for now.
466 self._log.debug("No xact handle. Skipping apply config")
467
468 return
469
470 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
471 dts_member_reg=self._reg,
472 xact=xact,
473 key_name="name_ref",
474 )
475
476 # Handle Deletes
477 for cfg in delete_cfgs:
478 self.delete_project(cfg.name_ref)
479
480 # Handle Adds
481 for cfg in add_cfgs:
482 self.add_project(cfg.name_ref)
483
484 # Handle Updates
485 for cfg in update_cfgs:
486 self.update_project(cfg.name_ref)
487
488 @asyncio.coroutine
489 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
490 """ Prepare callback from DTS for Project """
491
492 # xpath = ks_path.to_xpath(RwProjectYang.get_schema())
493 # name = ManoProject.from_xpath(xpath, self._log)
494 # if not name:
495 # self._log.error("Did not find the project name in ks: {}".
496 # format(xpath))
497 # xact_info.respond_xpath(rwdts.XactRspCode.NACK)
498 # return
499
500 action = xact_info.query_action
501 name = msg.name_ref
502 self._log.debug("Project %s on_prepare config received (action: %s): %s",
503 name, xact_info.query_action, msg)
504
505 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
506 if name in self.projects:
507 self._log.debug("Project {} already exists. Ignore request".
508 format(name))
509
510 else:
511 self._log.debug("Project {}: Invoking on_prepare add request".
512 format(name))
513 yield from self._callbacks.on_add_prepare(name)
514
515 elif action == rwdts.QueryAction.DELETE:
516 # Check if the entire project got deleted
517 fref = ProtobufC.FieldReference.alloc()
518 fref.goto_whole_message(msg.to_pbcm())
519 if fref.is_field_deleted():
520 if name in self.projects:
521 rc = yield from self._callbacks.on_delete_prepare(name)
522 if not rc:
523 self._log.error("Project {} should not be deleted".
524 format(name))
525 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
526 else:
527 self._log.warning("Delete on unknown project: {}".
528 format(name))
529
530 else:
531 self._log.error("Action (%s) NOT SUPPORTED", action)
532 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
533 return
534
535 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
536
537 self._log.debug("Registering for project config using xpath: %s",
538 ProjectDtsHandler.XPATH,
539 )
540
541 acg_handler = rift.tasklets.AppConfGroup.Handler(
542 on_apply=apply_config,
543 )
544
545 with self._dts.appconf_group_create(acg_handler) as acg:
546 self._reg = acg.register(
547 xpath=ProjectDtsHandler.XPATH,
548 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
549 on_prepare=on_prepare,
550 )
551
552 class ProjectHandler(object):
553 def __init__(self, tasklet, project_class, **kw):
554 self._tasklet = tasklet
555 self._log = tasklet.log
556 self._log_hdl = tasklet.log_hdl
557 self._dts = tasklet.dts
558 self._loop = tasklet.loop
559 self._class = project_class
560 self._kw = kw
561
562 self._log.debug("Creating project config handler")
563 self.project_cfg_handler = ProjectDtsHandler(
564 self._dts, self._log,
565 ProjectConfigCallbacks(
566 on_add_apply=self.on_project_added,
567 on_add_prepare=self.on_add_prepare,
568 on_delete_apply=self.on_project_deleted,
569 on_delete_prepare=self.on_delete_prepare,
570 )
571 )
572
573 def _get_tasklet_name(self):
574 return self._tasklet.tasklet_info.instance_name
575
576 def _get_project(self, name):
577 try:
578 proj = self._tasklet.projects[name]
579 except Exception as e:
580 self._log.exception("Project {} ({})not found for tasklet {}: {}".
581 format(name, list(self._tasklet.projects.keys()),
582 self._get_tasklet_name(), e))
583 raise e
584
585 return proj
586
587 def on_project_deleted(self, name):
588 self._log.debug("Project {} deleted".format(name))
589 try:
590 self._get_project(name).deregister()
591 except Exception as e:
592 self._log.exception("Project {} deregister for {} failed: {}".
593 format(name, self._get_tasklet_name(), e))
594
595 try:
596 proj = self._tasklet.projects.pop(name)
597 del proj
598 except Exception as e:
599 self._log.exception("Project {} delete for {} failed: {}".
600 format(name, self._get_tasklet_name(), e))
601
602 def on_project_added(self, name):
603 self._log.debug("Project {} added to tasklet {}".
604 format(name, self._get_tasklet_name()))
605 self._get_project(name)._apply = True
606
607 @asyncio.coroutine
608 def on_add_prepare(self, name):
609 self._log.debug("Project {} to be added to {}".
610 format(name, self._get_tasklet_name()))
611
612 try:
613 self._tasklet.projects[name] = \
614 self._class(name, self._tasklet, **(self._kw))
615 except Exception as e:
616 self._log.exception("Project {} create for {} failed: {}".
617 formatname, self._get_tasklet_name(), e())
618
619 try:
620 yield from self._get_project(name).register()
621 except Exception as e:
622 self._log.exception("Project {} register for tasklet {} failed: {}".
623 format(name, self._get_tasklet_name(), e))
624
625 @asyncio.coroutine
626 def on_delete_prepare(self, name):
627 self._log.debug("Project {} being deleted for tasklet {}".
628 format(name, self._get_tasklet_name()))
629 rc = yield from self._get_project(name).delete_prepare()
630 return rc
631
632 def register(self):
633 self.project_cfg_handler.register()