update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / common / python / rift / mano / utils / project.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2017 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import abc
20 import asyncio
21 import logging
22 from time import sleep
23
24 import gi
25 gi.require_version('RwProjectManoYang', '1.0')
26 gi.require_version('RwDts', '1.0')
27 from gi.repository import (
28 RwProjectManoYang,
29 RwDts as rwdts,
30 ProtobufC,
31 RwTypes,
32 )
33
34 import rift.tasklets
35
36
37 class ManoProjectError(Exception):
38 pass
39
40
41 class ManoProjNameSetErr(ManoProjectError):
42 pass
43
44
45 class ManoProjXpathNoProjErr(ManoProjectError):
46 pass
47
48
49 class ManoProjXpathKeyErr(ManoProjectError):
50 pass
51
52
53 class ManoProjXpathNotRootErr(ManoProjectError):
54 pass
55
56
57 class ManoProjXpathPresentErr(ManoProjectError):
58 pass
59
60
61 NS = 'rw-project'
62 PROJECT = 'project'
63 NS_PROJECT = '{}:{}'.format(NS, PROJECT)
64 XPATH = '/{}'.format(NS_PROJECT)
65 XPATH_LEN = len(XPATH)
66
67 NAME = 'name'
68 NAME_LEN = len(NAME)
69 NS_NAME = '{}:{}'.format(NS, NAME)
70
71 DEFAULT_PROJECT = 'default'
72 DEFAULT_PREFIX = "{}[{}='{}']".format(XPATH,
73 NS_NAME,
74 DEFAULT_PROJECT)
75
76
77 class ManoProject(object):
78 '''Class to handle the project name'''
79
80 log = None
81
82 @classmethod
83 def instance_from_xpath(cls, xpath, log):
84 name = cls.from_xpath(xpath, log)
85 if name is None:
86 return None
87
88 proj = ManoProject(log, name=name)
89 return proj
90
91 @classmethod
92 def from_xpath(cls, xpath, log):
93 log.debug("Get project name from {}".format(xpath));
94
95 if XPATH in xpath:
96 idx = xpath.find(XPATH)
97 if idx == -1:
98 msg = "Project not found in XPATH: {}".format(xpath)
99 log.error(msg)
100 raise ManoProjXpathNoProjErr(msg)
101
102 sub = xpath[idx+XPATH_LEN:].strip()
103 if (len(sub) < NAME_LEN) or (sub[0] != '['):
104 msg = "Project name not found in XPath: {}".format(xpath)
105 log.error(msg)
106 raise ManoProjXpathKeyErr(msg)
107
108 sub = sub[1:].strip()
109 idx = sub.find(NS_NAME)
110 if idx == -1:
111 idx = sub.find(NAME)
112 if idx != 0:
113 msg = "Project name not found in XPath: {}".format(xpath)
114 log.error(msg)
115 raise ManoProjXpathKeyErr(msg)
116
117 idx = sub.find(']')
118 if idx == -1:
119 msg = "XPath is invalid: {}".format(xpath)
120 log.error(msg)
121 raise ManoProjXpathKeyErr(msg)
122
123 sub = sub[:idx].strip()
124 try:
125 log.debug("Key and value found: {}".format(sub))
126 k, n = sub.split("=", 2)
127 name = n.strip(' \'"')
128 if name is None:
129 msg = "Project name is empty in XPath".format(xpath)
130 log.error(msg)
131 raise ManoProjXpathKeyErr (msg)
132
133 log.debug("Found project name {} from XPath {}".
134 format(name, xpath))
135 return name
136
137 except ValueError as e:
138 msg = "Project name not found in XPath: {}, exception: {}" \
139 .format(xpath, e)
140 log.exception(msg)
141 raise ManoProjXpathKeyErr(msg)
142 else:
143 msg = "Project not found in XPATH: {}".format(xpath)
144 log.error(msg)
145 raise ManoProjXpathNoProjErr(msg)
146
147 @classmethod
148 def get_log(cls):
149 if not cls.log:
150 cls.log = logging.getLogger('rw-mano-log.rw-project')
151 cls.log.setLevel(logging.ERROR)
152
153 @classmethod
154 def prefix_project(cls, xpath, project=None, log=None):
155 if log is None:
156 log = cls.get_log()
157
158 if project is None:
159 project = DEFAULT_PROJECT
160 proj_prefix = DEFAULT_PREFIX
161 else:
162 proj_prefix = "{}[{}='{}']".format(XPATH,
163 NS_NAME,
164 project)
165
166 prefix = ''
167 suffix = xpath
168 idx = xpath.find('C,/')
169 if idx == -1:
170 idx = xpath.find('D,/')
171
172 suffix = xpath
173 if idx != -1:
174 prefix = xpath[:2]
175 suffix = xpath[2:]
176
177 if suffix[0] != '/':
178 msg = "Non-rooted xpath provided: {}".format(xpath)
179 log.error(msg)
180 raise ManoProjXpathNotRootErr(msg)
181
182 idx = suffix.find(XPATH)
183 if idx == 0:
184 name = cls.from_xpath(xpath, log)
185 if name == project:
186 log.debug("Project already in the XPATH: {}".format(xpath))
187 return xpath
188
189 else:
190 msg = "Different project {} already in XPATH {}". \
191 format(name, xpath)
192 log.error(msg)
193 raise ManoProjXpathPresentErr(msg)
194
195 ret = prefix + proj_prefix + suffix
196 return ret
197
198
199 def __init__(self, log, name=None, tasklet=None):
200 self._log = log
201 self._name = None
202 self._prefix = None
203 self._pbcm = None
204 self._tasklet = None
205 self._dts = None
206 self._loop = None
207 self._log_hdl = None
208
209 # Track if the apply config was received
210 self._apply = False
211
212 if name:
213 self.name = name
214
215 def update(self, tasklet):
216 # Store the commonly used properties from a tasklet
217 self._tasklet = tasklet
218 self._log_hdl = tasklet.log_hdl
219 self._dts = tasklet.dts
220 self._loop = tasklet.loop
221
222 @property
223 def name(self):
224 return self._name
225
226 @property
227 def log(self):
228 return self._log
229
230 @property
231 def prefix(self):
232 return self._prefix
233
234 @property
235 def pbcm(self):
236 return self._pbcm
237
238 @property
239 def config(self):
240 return self._pbcm.project_config
241
242 @property
243 def tasklet(self):
244 return self._tasklet
245
246 @property
247 def log_hdl(self):
248 return self._log_hdl
249
250 @property
251 def dts(self):
252 return self._dts
253
254 @property
255 def loop(self):
256 return self._loop
257
258 @name.setter
259 def name(self, value):
260 if self._name is None:
261 self._name = value
262 self._prefix = "{}[{}='{}']".format(XPATH,
263 NS_NAME,
264 self._name)
265 self._pbcm = RwProjectManoYang.YangData_RwProject_Project(
266 name=self._name)
267
268 elif self._name == value:
269 self._log.debug("Setting the same name again for project {}".
270 format(value))
271 else:
272 msg = "Project name already set to {}".format(self._name)
273 self._log.error(msg)
274 raise ManoProjNameSetErr(msg)
275
276 def set_from_xpath(self, xpath):
277 self.name = ManoProject.from_xpath(xpath, self._log)
278
279 def add_project(self, xpath):
280 return ManoProject.prefix_project(xpath, log=self._log, project=self._name)
281
282 @abc.abstractmethod
283 @asyncio.coroutine
284 def delete_prepare(self):
285 self._log.debug("Delete prepare for project {}".format(self._name))
286 return (True, "True")
287
288 @abc.abstractmethod
289 @asyncio.coroutine
290 def register(self):
291 msg = "Register not implemented for project type {}". \
292 format(self.__class__.__name__)
293 self._log.error(msg)
294 raise NotImplementedError(msg)
295
296 @abc.abstractmethod
297 def deregister(self):
298 msg = "De-register not implemented for project type {}". \
299 format(self.__class__.__name__)
300 self._log.error(msg)
301 raise NotImplementedError(msg)
302
303 def rpc_check(self, msg, xact_info=None):
304 '''Check if the rpc is for this project'''
305 try:
306 project = msg.project_name
307 except AttributeError as e:
308 project = DEFAULT_PROJECT
309
310 if project != self.name:
311 self._log.debug("Project {}: RPC is for different project {}".
312 format(self.name, project))
313 if xact_info:
314 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
315 return False
316
317 return True
318
319 @asyncio.coroutine
320 def create_project(self, dts):
321 proj_xpath = "C,{}/config".format(self.prefix)
322 self._log.info("Creating project: {} with {}".
323 format(proj_xpath, self.config.as_dict()))
324
325 yield from dts.query_create(proj_xpath,
326 rwdts.XactFlag.ADVISE,
327 self.config)
328
329
330 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
331 #TODO: Check why this is getting called during project delete
332 if not dts_member_reg:
333 return [], [], []
334
335 # Unforunately, it is currently difficult to figure out what has exactly
336 # changed in this xact without Pbdelta support (RIFT-4916)
337 # As a workaround, we can fetch the pre and post xact elements and
338 # perform a comparison to figure out adds/deletes/updates
339 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
340 curr_cfgs = list(dts_member_reg.elements)
341
342 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
343 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
344
345 # Find Adds
346 added_keys = set(xact_key_map) - set(curr_key_map)
347 added_cfgs = [xact_key_map[key] for key in added_keys]
348
349 # Find Deletes
350 deleted_keys = set(curr_key_map) - set(xact_key_map)
351 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
352
353 # Find Updates
354 updated_keys = set(curr_key_map) & set(xact_key_map)
355 updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
356
357 return added_cfgs, deleted_cfgs, updated_cfgs
358
359
360 class ProjectConfigCallbacks(object):
361 def __init__(self,
362 on_add_apply=None, on_add_prepare=None,
363 on_delete_apply=None, on_delete_prepare=None):
364
365 @asyncio.coroutine
366 def prepare_noop(*args, **kwargs):
367 pass
368
369 def apply_noop(*args, **kwargs):
370 pass
371
372 self.on_add_apply = on_add_apply
373 self.on_add_prepare = on_add_prepare
374 self.on_delete_apply = on_delete_apply
375 self.on_delete_prepare = on_delete_prepare
376
377 for f in ('on_add_apply', 'on_delete_apply'):
378 ref = getattr(self, f)
379 if ref is None:
380 setattr(self, f, apply_noop)
381 continue
382
383 if asyncio.iscoroutinefunction(ref):
384 raise ValueError('%s cannot be a coroutine' % (f,))
385
386 for f in ('on_add_prepare', 'on_delete_prepare'):
387 ref = getattr(self, f)
388 if ref is None:
389 setattr(self, f, prepare_noop)
390 continue
391
392 if not asyncio.iscoroutinefunction(ref):
393 raise ValueError("%s must be a coroutine" % f)
394
395
396 class ProjectDtsHandler(object):
397 XPATH = "C,{}/project-config".format(XPATH)
398
399 def __init__(self, dts, log, callbacks, sub_config=True):
400 self._dts = dts
401 self._log = log
402 self._callbacks = callbacks
403
404 if sub_config:
405 self.xpath = ProjectDtsHandler.XPATH
406 self._key = 'name_ref'
407 else:
408 self.xpath = "C,{}".format(XPATH)
409 self._key = 'name'
410
411 self.reg = None
412 self.projects = []
413
414 @property
415 def log(self):
416 return self._log
417
418 @property
419 def dts(self):
420 return self._dts
421
422 def add_project(self, name):
423 self._log.info("Adding project: {}".format(name))
424
425 if name not in self.projects:
426 self._callbacks.on_add_apply(name)
427 self.projects.append(name)
428 else:
429 self._log.error("Project already present: {}".
430 format(name))
431
432 def delete_project(self, name):
433 self._log.info("Deleting project: {}".format(name))
434 if name in self.projects:
435 self._callbacks.on_delete_apply(name)
436 self.projects.remove(name)
437 else:
438 self._log.error("Unrecognized project: {}".
439 format(name))
440
441 def update_project(self, name):
442 """ Update an existing project
443
444 Currently, we do not take any action on MANO for this,
445 so no callbacks are defined
446
447 Arguments:
448 msg - The project config message
449 """
450 self._log.info("Updating project: {}".format(name))
451 if name in self.projects:
452 pass
453 else:
454 self.add_project(name)
455
456 def register(self):
457 def on_init(acg, xact, scratch):
458 self._log.debug("on_init")
459 scratch["projects"] = {
460 "added": [],
461 "deleted": [],
462 "updated": [],
463 }
464 return scratch
465
466 def readd_projects(xact):
467 self._log.info("Re-add projects")
468
469 for cfg, ks in self._reg.get_xact_elements(xact, include_keyspec=True):
470 xpath = ks.to_xpath(RwProjectManoYang.get_schema())
471 self._log.debug("Got ks {} for cfg {}".format(xpath, cfg.as_dict()))
472 name = ManoProject.from_xpath(xpath, self._log)
473 self._log.debug("Project to add: {}".format(name))
474 self.add_project(name)
475
476 @asyncio.coroutine
477 def apply_config(dts, acg, xact, action, scratch):
478 self._log.debug("Got project apply config (xact: %s) (action: %s): %s",
479 xact, action, scratch)
480
481 if xact.xact is None:
482 if action == rwdts.AppconfAction.INSTALL:
483 readd_projects(xact)
484 else:
485 self._log.debug("No xact handle. Skipping apply config")
486
487 return
488
489 try:
490 add_cfgs = scratch["projects"]["added"]
491 except KeyError:
492 add_cfgs = []
493
494 try:
495 del_cfgs = scratch["projects"]["deleted"]
496 except KeyError:
497 del_cfgs = []
498
499 try:
500 update_cfgs = scratch["projects"]["updated"]
501 except KeyError:
502 update_cfgs = []
503
504
505 # Handle Deletes
506 for name in del_cfgs:
507 self.delete_project(name)
508
509 # Handle Adds
510 for name, msg in add_cfgs:
511 self.add_project(name)
512
513 # Handle Updates
514 for name, msg in update_cfgs:
515 self.update_project(name)
516
517 try:
518 del scratch["projects"]
519 except KeyError:
520 pass
521
522 return RwTypes.RwStatus.SUCCESS
523
524 @asyncio.coroutine
525 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
526 """ Prepare callback from DTS for Project """
527
528 action = xact_info.query_action
529 xpath = ks_path.to_xpath(RwProjectManoYang.get_schema())
530 self._log.debug("Project xpath: {}".format(xpath))
531 name = ManoProject.from_xpath(xpath, self._log)
532
533 self._log.debug("Project %s on_prepare config received (action: %s): %s",
534 name, xact_info.query_action, msg)
535
536 if action == rwdts.QueryAction.CREATE:
537 if name in self.projects:
538 self._log.debug("Project {} already exists. Ignore request".
539 format(name))
540 else:
541 yield from self._callbacks.on_add_prepare(name)
542 scratch["projects"]["added"].append((name, msg))
543
544 elif action == rwdts.QueryAction.UPDATE:
545 if name in self.projects:
546 scratch["projects"]["updated"].append((name, msg))
547 else:
548 self._log.debug("Project {}: Invoking on_prepare add request".
549 format(name))
550 yield from self._callbacks.on_add_prepare(name)
551 scratch["projects"]["added"].append((name, msg))
552
553
554 elif action == rwdts.QueryAction.DELETE:
555 # Check if the entire project got deleted
556 fref = ProtobufC.FieldReference.alloc()
557 fref.goto_whole_message(msg.to_pbcm())
558 if fref.is_field_deleted():
559 if name in self.projects:
560 rc, delete_msg = yield from self._callbacks.on_delete_prepare(name)
561 if not rc:
562 self._log.error("Project {} should not be deleted. Reason : {}".
563 format(name, delete_msg))
564
565 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
566 ProjectDtsHandler.XPATH,
567 delete_msg)
568
569 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
570 return
571
572 scratch["projects"]["deleted"].append(name)
573 else:
574 self._log.warning("Delete on unknown project: {}".
575 format(name))
576 else:
577 self._log.error("Action (%s) NOT SUPPORTED", action)
578 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
579 return
580 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
581
582 self._log.debug("Registering for project config using xpath: %s",
583 ProjectDtsHandler.XPATH,
584 )
585
586 acg_handler = rift.tasklets.AppConfGroup.Handler(
587 on_apply=apply_config,
588 on_init=on_init)
589
590 with self._dts.appconf_group_create(acg_handler) as acg:
591 self._reg = acg.register(
592 xpath=ProjectDtsHandler.XPATH,
593 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
594 on_prepare=on_prepare,
595 )
596
597
598 class ProjectHandler(object):
599 def __init__(self, tasklet, project_class, **kw):
600 self._tasklet = tasklet
601 self._log = tasklet.log
602 self._log_hdl = tasklet.log_hdl
603 self._dts = tasklet.dts
604 self._loop = tasklet.loop
605 self._class = project_class
606 self._kw = kw
607
608 self._log.debug("Creating project config handler")
609 self.project_cfg_handler = ProjectDtsHandler(
610 self._dts, self._log,
611 ProjectConfigCallbacks(
612 on_add_apply=self.on_project_added,
613 on_add_prepare=self.on_add_prepare,
614 on_delete_apply=self.on_project_deleted,
615 on_delete_prepare=self.on_delete_prepare,
616 )
617 )
618
619 def _get_tasklet_name(self):
620 return self._tasklet.tasklet_info.instance_name
621
622 def _get_project(self, name):
623 try:
624 proj = self._tasklet.projects[name]
625 except Exception as e:
626 self._log.exception("Project {} ({})not found for tasklet {}: {}".
627 format(name, list(self._tasklet.projects.keys()),
628 self._get_tasklet_name(), e))
629 raise e
630
631 return proj
632
633 def on_project_deleted(self, name):
634 self._log.debug("Project {} deleted".format(name))
635 try:
636 self._get_project(name).deregister()
637 except Exception as e:
638 self._log.exception("Project {} deregister for {} failed: {}".
639 format(name, self._get_tasklet_name(), e))
640
641 try:
642 proj = self._tasklet.projects.pop(name)
643 del proj
644 except Exception as e:
645 self._log.exception("Project {} delete for {} failed: {}".
646 format(name, self._get_tasklet_name(), e))
647
648 def on_project_added(self, name):
649 if name not in self._tasklet.projects:
650 try:
651 self._tasklet.projects[name] = \
652 self._class(name, self._tasklet, **(self._kw))
653 task = asyncio.ensure_future(self._get_project(name).register(),
654 loop=self._loop)
655
656 self._log.debug("Project {} register: {}".format(name, str(task)))
657
658 except Exception as e:
659 self._log.exception("Project {} create for {} failed: {}".
660 format(name, self._get_tasklet_name(), e))
661 raise e
662
663 self._log.debug("Project {} added to tasklet {}".
664 format(name, self._get_tasklet_name()))
665 self._get_project(name)._apply = True
666
667 @asyncio.coroutine
668 def on_add_prepare(self, name):
669 self._log.debug("Project {} to be added to {}".
670 format(name, self._get_tasklet_name()))
671 if name in self._tasklet.projects:
672 self._log.error("Project {} already exists for {}".
673 format(name, self._get_tasklet_name()))
674 return
675
676 try:
677 self._tasklet.projects[name] = \
678 self._class(name, self._tasklet, **(self._kw))
679 yield from self._get_project(name).register()
680
681 except Exception as e:
682 self._log.exception("Project {} create for {} failed: {}".
683 format(name, self._get_tasklet_name(), e))
684 raise e
685
686 @asyncio.coroutine
687 def on_delete_prepare(self, name):
688 self._log.debug("Project {} being deleted for tasklet {}".
689 format(name, self._get_tasklet_name()))
690 rc, delete_msg = yield from self._get_project(name).delete_prepare()
691 return rc, delete_msg
692
693 def register(self):
694 self.project_cfg_handler.register()