Merge from OSM SO master
[osm/SO.git] / common / python / rift / mano / utils / project.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2017 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import abc
20 import asyncio
21 import logging
22
23 import gi
24 gi.require_version('RwProjectManoYang', '1.0')
25 gi.require_version('RwDts', '1.0')
26 from gi.repository import (
27 RwProjectManoYang,
28 RwDts as rwdts,
29 ProtobufC,
30 RwTypes,
31 )
32
33 import rift.tasklets
34
35
36 class ManoProjectError(Exception):
37 pass
38
39
40 class ManoProjNameSetErr(ManoProjectError):
41 pass
42
43
44 class ManoProjXpathNoProjErr(ManoProjectError):
45 pass
46
47
48 class ManoProjXpathKeyErr(ManoProjectError):
49 pass
50
51
52 class ManoProjXpathNotRootErr(ManoProjectError):
53 pass
54
55
56 class ManoProjXpathPresentErr(ManoProjectError):
57 pass
58
59
60 NS = 'rw-project'
61 PROJECT = 'project'
62 NS_PROJECT = '{}:{}'.format(NS, PROJECT)
63 XPATH = '/{}'.format(NS_PROJECT)
64 XPATH_LEN = len(XPATH)
65
66 NAME = 'name'
67 NAME_LEN = len(NAME)
68 NS_NAME = '{}:{}'.format(NS, NAME)
69
70 DEFAULT_PROJECT = 'default'
71 DEFAULT_PREFIX = "{}[{}='{}']".format(XPATH,
72 NS_NAME,
73 DEFAULT_PROJECT)
74
75
76 class ManoProject(object):
77 '''Class to handle the project name'''
78
79 log = None
80
81 @classmethod
82 def instance_from_xpath(cls, xpath, log):
83 name = cls.from_xpath(xpath, log)
84 if name is None:
85 return None
86
87 proj = ManoProject(log, name=name)
88 return proj
89
90 @classmethod
91 def from_xpath(cls, xpath, log):
92 log.debug("Get project name from {}".format(xpath));
93
94 if XPATH in xpath:
95 idx = xpath.find(XPATH)
96 if idx == -1:
97 msg = "Project not found in XPATH: {}".format(xpath)
98 log.error(msg)
99 raise ManoProjXpathNoProjErr(msg)
100
101 sub = xpath[idx+XPATH_LEN:].strip()
102 if (len(sub) < NAME_LEN) or (sub[0] != '['):
103 msg = "Project name not found in XPath: {}".format(xpath)
104 log.error(msg)
105 raise ManoProjXpathKeyErr(msg)
106
107 sub = sub[1:].strip()
108 idx = sub.find(NS_NAME)
109 if idx == -1:
110 idx = sub.find(NAME)
111 if idx != 0:
112 msg = "Project name not found in XPath: {}".format(xpath)
113 log.error(msg)
114 raise ManoProjXpathKeyErr(msg)
115
116 idx = sub.find(']')
117 if idx == -1:
118 msg = "XPath is invalid: {}".format(xpath)
119 log.error(msg)
120 raise ManoProjXpathKeyErr(msg)
121
122 sub = sub[:idx].strip()
123 try:
124 log.debug("Key and value found: {}".format(sub))
125 k, n = sub.split("=", 2)
126 name = n.strip(' \'"')
127 if name is None:
128 msg = "Project name is empty in XPath".format(xpath)
129 log.error(msg)
130 raise ManoProjXpathKeyErr (msg)
131
132 log.debug("Found project name {} from XPath {}".
133 format(name, xpath))
134 return name
135
136 except ValueError as e:
137 msg = "Project name not found in XPath: {}, exception: {}" \
138 .format(xpath, e)
139 log.exception(msg)
140 raise ManoProjXpathKeyErr(msg)
141 else:
142 msg = "Project not found in XPATH: {}".format(xpath)
143 log.error(msg)
144 raise ManoProjXpathNoProjErr(msg)
145
146 @classmethod
147 def get_log(cls):
148 if not cls.log:
149 cls.log = logging.getLogger('rw-mano-log.rw-project')
150 cls.log.setLevel(logging.ERROR)
151
152 @classmethod
153 def prefix_project(cls, xpath, project=None, log=None):
154 if log is None:
155 log = cls.get_log()
156
157 if project is None:
158 project = DEFAULT_PROJECT
159 proj_prefix = DEFAULT_PREFIX
160 else:
161 proj_prefix = "{}[{}='{}']".format(XPATH,
162 NS_NAME,
163 project)
164
165 log.debug("Add project {} to {}".format(project, xpath))
166
167 prefix = ''
168 suffix = xpath
169 idx = xpath.find('C,/')
170 if idx == -1:
171 idx = xpath.find('D,/')
172
173 suffix = xpath
174 if idx != -1:
175 prefix = xpath[:2]
176 suffix = xpath[2:]
177
178 if suffix[0] != '/':
179 msg = "Non-rooted xpath provided: {}".format(xpath)
180 log.error(msg)
181 raise ManoProjXpathNotRootErr(msg)
182
183 idx = suffix.find(XPATH)
184 if idx == 0:
185 name = cls.from_xpath(xpath, log)
186 if name == project:
187 log.debug("Project already in the XPATH: {}".format(xpath))
188 return xpath
189
190 else:
191 msg = "Different project {} already in XPATH {}". \
192 format(name, xpath)
193 log.error(msg)
194 raise ManoProjXpathPresentErr(msg)
195
196 ret = prefix + proj_prefix + suffix
197 log.debug("XPath with project: {}".format(ret))
198 return ret
199
200
201 def __init__(self, log, name=None, tasklet=None):
202 self._log = log
203 self._name = None
204 self._prefix = None
205 self._pbcm = None
206 self._tasklet = None
207 self._dts = None
208 self._loop = None
209 self._log_hdl = None
210
211 # Track if the apply config was received
212 self._apply = False
213
214 if name:
215 self.name = name
216
217 def update(self, tasklet):
218 # Store the commonly used properties from a tasklet
219 self._tasklet = tasklet
220 self._log_hdl = tasklet.log_hdl
221 self._dts = tasklet.dts
222 self._loop = tasklet.loop
223
224 @property
225 def name(self):
226 return self._name
227
228 @property
229 def log(self):
230 return self._log
231
232 @property
233 def prefix(self):
234 return self._prefix
235
236 @property
237 def pbcm(self):
238 return self._pbcm
239
240 @property
241 def config(self):
242 return self._pbcm.project_config
243
244 @property
245 def tasklet(self):
246 return self._tasklet
247
248 @property
249 def log_hdl(self):
250 return self._log_hdl
251
252 @property
253 def dts(self):
254 return self._dts
255
256 @property
257 def loop(self):
258 return self._loop
259
260 @name.setter
261 def name(self, value):
262 if self._name is None:
263 self._name = value
264 self._prefix = "{}[{}='{}']".format(XPATH,
265 NS_NAME,
266 self._name)
267 self._pbcm = RwProjectManoYang.YangData_RwProject_Project(
268 name=self._name)
269
270 elif self._name == value:
271 self._log.debug("Setting the same name again for project {}".
272 format(value))
273 else:
274 msg = "Project name already set to {}".format(self._name)
275 self._log.error(msg)
276 raise ManoProjNameSetErr(msg)
277
278 def set_from_xpath(self, xpath):
279 self.name = ManoProject.from_xpath(xpath, self._log)
280
281 def add_project(self, xpath):
282 return ManoProject.prefix_project(xpath, log=self._log, project=self._name)
283
284 @abc.abstractmethod
285 @asyncio.coroutine
286 def delete_prepare(self):
287 self._log.debug("Delete prepare for project {}".format(self._name))
288 return True
289
290 @abc.abstractmethod
291 @asyncio.coroutine
292 def register(self):
293 msg = "Register not implemented for project type {}". \
294 format(self.__class__.__name__)
295 self._log.error(msg)
296 raise NotImplementedError(msg)
297
298 @abc.abstractmethod
299 def deregister(self):
300 msg = "De-register not implemented for project type {}". \
301 format(self.__class__.__name__)
302 self._log.error(msg)
303 raise NotImplementedError(msg)
304
305 def rpc_check(self, msg, xact_info=None):
306 '''Check if the rpc is for this project'''
307 try:
308 project = msg.project_name
309 except AttributeError as e:
310 project = DEFAULT_PROJECT
311
312 if project != self.name:
313 self._log.debug("Project {}: RPC is for different project {}".
314 format(self.name, project))
315 if xact_info:
316 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
317 return False
318
319 return True
320
321 @asyncio.coroutine
322 def create_project(self, dts):
323 proj_xpath = "C,{}/config".format(self.prefix)
324 self._log.info("Creating project: {} with {}".
325 format(proj_xpath, self.config.as_dict()))
326
327 yield from dts.query_create(proj_xpath,
328 rwdts.XactFlag.ADVISE,
329 self.config)
330
331
332 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
333 #TODO: Check why this is getting called during project delete
334 if not dts_member_reg:
335 return [], [], []
336
337 # Unforunately, it is currently difficult to figure out what has exactly
338 # changed in this xact without Pbdelta support (RIFT-4916)
339 # As a workaround, we can fetch the pre and post xact elements and
340 # perform a comparison to figure out adds/deletes/updates
341 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
342 curr_cfgs = list(dts_member_reg.elements)
343
344 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
345 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
346
347 # Find Adds
348 added_keys = set(xact_key_map) - set(curr_key_map)
349 added_cfgs = [xact_key_map[key] for key in added_keys]
350
351 # Find Deletes
352 deleted_keys = set(curr_key_map) - set(xact_key_map)
353 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
354
355 # Find Updates
356 updated_keys = set(curr_key_map) & set(xact_key_map)
357 updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
358
359 return added_cfgs, deleted_cfgs, updated_cfgs
360
361
362 class ProjectConfigCallbacks(object):
363 def __init__(self,
364 on_add_apply=None, on_add_prepare=None,
365 on_delete_apply=None, on_delete_prepare=None):
366
367 @asyncio.coroutine
368 def prepare_noop(*args, **kwargs):
369 pass
370
371 def apply_noop(*args, **kwargs):
372 pass
373
374 self.on_add_apply = on_add_apply
375 self.on_add_prepare = on_add_prepare
376 self.on_delete_apply = on_delete_apply
377 self.on_delete_prepare = on_delete_prepare
378
379 for f in ('on_add_apply', 'on_delete_apply'):
380 ref = getattr(self, f)
381 if ref is None:
382 setattr(self, f, apply_noop)
383 continue
384
385 if asyncio.iscoroutinefunction(ref):
386 raise ValueError('%s cannot be a coroutine' % (f,))
387
388 for f in ('on_add_prepare', 'on_delete_prepare'):
389 ref = getattr(self, f)
390 if ref is None:
391 setattr(self, f, prepare_noop)
392 continue
393
394 if not asyncio.iscoroutinefunction(ref):
395 raise ValueError("%s must be a coroutine" % f)
396
397
398 class ProjectDtsHandler(object):
399 XPATH = "C,{}/project-config".format(XPATH)
400
401 def __init__(self, dts, log, callbacks, sub_config=True):
402 self._dts = dts
403 self._log = log
404 self._callbacks = callbacks
405
406 if sub_config:
407 self.xpath = ProjectDtsHandler.XPATH
408 self._key = 'name_ref'
409 else:
410 self.xpath = "C,{}".format(XPATH)
411 self._key = 'name'
412
413 self.reg = None
414 self.projects = []
415
416 @property
417 def log(self):
418 return self._log
419
420 @property
421 def dts(self):
422 return self._dts
423
424 def add_project(self, name):
425 self.log.info("Adding project: {}".format(name))
426
427 if name not in self.projects:
428 self._callbacks.on_add_apply(name)
429 self.projects.append(name)
430 else:
431 self.log.error("Project already present: {}".
432 format(name))
433
434 def delete_project(self, name):
435 self._log.info("Deleting project: {}".format(name))
436 if name in self.projects:
437 self._callbacks.on_delete_apply(name)
438 self.projects.remove(name)
439 else:
440 self.log.error("Unrecognized project: {}".
441 format(name))
442
443 def update_project(self, name):
444 """ Update an existing project
445
446 Currently, we do not take any action on MANO for this,
447 so no callbacks are defined
448
449 Arguments:
450 msg - The project config message
451 """
452 self._log.info("Updating project: {}".format(name))
453 if name in self.projects:
454 pass
455 else:
456 self.add_project(name)
457
458 def register(self):
459 def on_init(acg, xact, scratch):
460 self._log.debug("on_init")
461 scratch["projects"] = {
462 "added": [],
463 "deleted": [],
464 "updated": [],
465 }
466 return scratch
467
468 @asyncio.coroutine
469 def apply_config(dts, acg, xact, action, scratch):
470 self._log.debug("Got project apply config (xact: %s) (action: %s): %s",
471 xact, action, scratch)
472
473 if xact.xact is None:
474 if action == rwdts.AppconfAction.INSTALL:
475 curr_cfg = self._reg.elements
476 for cfg in curr_cfg:
477 self._log.debug("Project being re-added after restart.")
478 self.add_project(cfg.name_ref)
479 else:
480 # When RIFT first comes up, an INSTALL is called with the current config
481 # Since confd doesn't actally persist data this never has any data so
482 # skip this for now.
483 self._log.debug("No xact handle. Skipping apply config")
484
485 return
486
487 try:
488 add_cfgs = scratch["projects"]["added"]
489 except KeyError:
490 add_cfgs = []
491
492 try:
493 del_cfgs = scratch["projects"]["deleted"]
494 except KeyError:
495 del_cfgs = []
496
497 try:
498 update_cfgs = scratch["projects"]["updated"]
499 except KeyError:
500 update_cfgs = []
501
502
503 # Handle Deletes
504 for name in del_cfgs:
505 self.delete_project(name)
506
507 # Handle Adds
508 for name, msg in add_cfgs:
509 self.add_project(name)
510
511 # Handle Updates
512 for name, msg in update_cfgs:
513 self.update_project(name)
514
515 return RwTypes.RwStatus.SUCCESS
516
517 @asyncio.coroutine
518 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
519 """ Prepare callback from DTS for Project """
520
521 action = xact_info.query_action
522 xpath = ks_path.to_xpath(RwProjectManoYang.get_schema())
523 self._log.debug("Project xpath: {}".format(xpath))
524 name = ManoProject.from_xpath(xpath, self._log)
525
526 self._log.debug("Project %s on_prepare config received (action: %s): %s",
527 name, xact_info.query_action, msg)
528
529 if action == rwdts.QueryAction.CREATE:
530 if name in self.projects:
531 self._log.debug("Project {} already exists. Ignore request".
532 format(name))
533 else:
534 yield from self._callbacks.on_add_prepare(name)
535 scratch["projects"]["added"].append((name, msg))
536
537 elif action == rwdts.QueryAction.UPDATE:
538 if name in self.projects:
539 scratch["projects"]["updated"].append((name, msg))
540 else:
541 self._log.debug("Project {}: Invoking on_prepare add request".
542 format(name))
543 yield from self._callbacks.on_add_prepare(name)
544 scratch["projects"]["added"].append((name, msg))
545
546
547 elif action == rwdts.QueryAction.DELETE:
548 # Check if the entire project got deleted
549 fref = ProtobufC.FieldReference.alloc()
550 fref.goto_whole_message(msg.to_pbcm())
551 if fref.is_field_deleted():
552 if name in self.projects:
553 rc = yield from self._callbacks.on_delete_prepare(name)
554 if not rc:
555 self._log.error("Project {} should not be deleted".
556 format(name))
557 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
558 return
559
560 scratch["projects"]["deleted"].append(name)
561 else:
562 self._log.warning("Delete on unknown project: {}".
563 format(name))
564
565 else:
566 self._log.error("Action (%s) NOT SUPPORTED", action)
567 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
568 return
569
570 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
571
572 self._log.debug("Registering for project config using xpath: %s",
573 ProjectDtsHandler.XPATH,
574 )
575
576 acg_handler = rift.tasklets.AppConfGroup.Handler(
577 on_apply=apply_config,
578 on_init=on_init)
579
580 with self._dts.appconf_group_create(acg_handler) as acg:
581 self._reg = acg.register(
582 xpath=ProjectDtsHandler.XPATH,
583 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
584 on_prepare=on_prepare,
585 )
586
587
588 class ProjectHandler(object):
589 def __init__(self, tasklet, project_class, **kw):
590 self._tasklet = tasklet
591 self._log = tasklet.log
592 self._log_hdl = tasklet.log_hdl
593 self._dts = tasklet.dts
594 self._loop = tasklet.loop
595 self._class = project_class
596 self._kw = kw
597
598 self._log.debug("Creating project config handler")
599 self.project_cfg_handler = ProjectDtsHandler(
600 self._dts, self._log,
601 ProjectConfigCallbacks(
602 on_add_apply=self.on_project_added,
603 on_add_prepare=self.on_add_prepare,
604 on_delete_apply=self.on_project_deleted,
605 on_delete_prepare=self.on_delete_prepare,
606 )
607 )
608
609 def _get_tasklet_name(self):
610 return self._tasklet.tasklet_info.instance_name
611
612 def _get_project(self, name):
613 try:
614 proj = self._tasklet.projects[name]
615 except Exception as e:
616 self._log.exception("Project {} ({})not found for tasklet {}: {}".
617 format(name, list(self._tasklet.projects.keys()),
618 self._get_tasklet_name(), e))
619 raise e
620
621 return proj
622
623 def on_project_deleted(self, name):
624 self._log.debug("Project {} deleted".format(name))
625 try:
626 self._get_project(name).deregister()
627 except Exception as e:
628 self._log.exception("Project {} deregister for {} failed: {}".
629 format(name, self._get_tasklet_name(), e))
630
631 try:
632 proj = self._tasklet.projects.pop(name)
633 del proj
634 except Exception as e:
635 self._log.exception("Project {} delete for {} failed: {}".
636 format(name, self._get_tasklet_name(), e))
637
638 def on_project_added(self, name):
639 self._log.debug("Project {} added to tasklet {}".
640 format(name, self._get_tasklet_name()))
641 self._get_project(name)._apply = True
642
643 @asyncio.coroutine
644 def on_add_prepare(self, name):
645 self._log.debug("Project {} to be added to {}".
646 format(name, self._get_tasklet_name()))
647
648 try:
649 self._tasklet.projects[name] = \
650 self._class(name, self._tasklet, **(self._kw))
651 except Exception as e:
652 self._log.exception("Project {} create for {} failed: {}".
653 format(name, self._get_tasklet_name(), e))
654
655 try:
656 yield from self._get_project(name).register()
657 except Exception as e:
658 self._log.exception("Project {} register for tasklet {} failed: {}".
659 format(name, self._get_tasklet_name(), e))
660
661 @asyncio.coroutine
662 def on_delete_prepare(self, name):
663 self._log.debug("Project {} being deleted for tasklet {}".
664 format(name, self._get_tasklet_name()))
665 rc = yield from self._get_project(name).delete_prepare()
666 return rc
667
668 def register(self):
669 self.project_cfg_handler.register()