de495e9a696ffae443bf8ba6ec42097017b01950
[osm/SO.git] / common / python / rift / mano / utils / project.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2017 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import abc
20 import asyncio
21 import logging
22
23 import gi
24 gi.require_version('RwProjectManoYang', '1.0')
25 gi.require_version('RwDtsYang', '1.0')
26 from gi.repository import (
27 RwProjectManoYang,
28 RwDts as rwdts,
29 ProtobufC,
30 RwTypes,
31 )
32
33 import rift.tasklets
34
35
36 class ManoProjectError(Exception):
37 pass
38
39
40 class ManoProjNameSetErr(ManoProjectError):
41 pass
42
43
44 class ManoProjXpathNoProjErr(ManoProjectError):
45 pass
46
47
48 class ManoProjXpathKeyErr(ManoProjectError):
49 pass
50
51
52 class ManoProjXpathNotRootErr(ManoProjectError):
53 pass
54
55
56 class ManoProjXpathPresentErr(ManoProjectError):
57 pass
58
59
60 NS = 'rw-project'
61 PROJECT = 'project'
62 NS_PROJECT = '{}:{}'.format(NS, PROJECT)
63 XPATH = '/{}'.format(NS_PROJECT)
64 XPATH_LEN = len(XPATH)
65
66 NAME = 'name'
67 NAME_LEN = len(NAME)
68 NS_NAME = '{}:{}'.format(NS, NAME)
69
70 DEFAULT_PROJECT = 'default'
71 DEFAULT_PREFIX = "{}[{}='{}']".format(XPATH,
72 NS_NAME,
73 DEFAULT_PROJECT)
74
75
76 class ManoProject(object):
77 '''Class to handle the project name'''
78
79 log = None
80
81 @classmethod
82 def instance_from_xpath(cls, xpath, log):
83 name = cls.from_xpath(xpath, log)
84 if name is None:
85 return None
86
87 proj = ManoProject(log, name=name)
88 return proj
89
90 @classmethod
91 def from_xpath(cls, xpath, log):
92 log.debug("Get project name from {}".format(xpath));
93
94 if XPATH in xpath:
95 idx = xpath.find(XPATH)
96 if idx == -1:
97 msg = "Project not found in XPATH: {}".format(xpath)
98 log.error(msg)
99 raise ManoProjXpathNoProjErr(msg)
100
101 sub = xpath[idx+XPATH_LEN:].strip()
102 if (len(sub) < NAME_LEN) or (sub[0] != '['):
103 msg = "Project name not found in XPath: {}".format(xpath)
104 log.error(msg)
105 raise ManoProjXpathKeyErr(msg)
106
107 sub = sub[1:].strip()
108 idx = sub.find(NS_NAME)
109 if idx == -1:
110 idx = sub.find(NAME)
111 if idx != 0:
112 msg = "Project name not found in XPath: {}".format(xpath)
113 log.error(msg)
114 raise ManoProjXpathKeyErr(msg)
115
116 idx = sub.find(']')
117 if idx == -1:
118 msg = "XPath is invalid: {}".format(xpath)
119 log.error(msg)
120 raise ManoProjXpathKeyErr(msg)
121
122 sub = sub[:idx].strip()
123 try:
124 log.debug("Key and value found: {}".format(sub))
125 k, n = sub.split("=", 2)
126 name = n.strip(' \'"')
127 if name is None:
128 msg = "Project name is empty in XPath".format(xpath)
129 log.error(msg)
130 raise ManoProjXpathKeyErr (msg)
131
132 log.debug("Found project name {} from XPath {}".
133 format(name, xpath))
134 return name
135
136 except ValueError as e:
137 msg = "Project name not found in XPath: {}, exception: {}" \
138 .format(xpath, e)
139 log.exception(msg)
140 raise ManoProjXpathKeyErr(msg)
141 else:
142 msg = "Project not found in XPATH: {}".format(xpath)
143 log.error(msg)
144 raise ManoProjXpathNoProjErr(msg)
145
146 @classmethod
147 def get_log(cls):
148 if not cls.log:
149 cls.log = logging.getLogger('rw-mano-log.rw-project')
150 cls.log.setLevel(logging.ERROR)
151
152 @classmethod
153 def prefix_project(cls, xpath, project=None, log=None):
154 if log is None:
155 log = cls.get_log()
156
157 if project is None:
158 project = DEFAULT_PROJECT
159 proj_prefix = DEFAULT_PREFIX
160 else:
161 proj_prefix = "{}[{}='{}']".format(XPATH,
162 NS_NAME,
163 project)
164
165 log.debug("Add project {} to {}".format(project, xpath))
166
167 prefix = ''
168 suffix = xpath
169 idx = xpath.find('C,/')
170 if idx == -1:
171 idx = xpath.find('D,/')
172
173 suffix = xpath
174 if idx != -1:
175 prefix = xpath[:2]
176 suffix = xpath[2:]
177
178 if suffix[0] != '/':
179 msg = "Non-rooted xpath provided: {}".format(xpath)
180 log.error(msg)
181 raise ManoProjXpathNotRootErr(msg)
182
183 idx = suffix.find(XPATH)
184 if idx == 0:
185 name = cls.from_xpath(xpath, log)
186 if name == project:
187 log.debug("Project already in the XPATH: {}".format(xpath))
188 return xpath
189
190 else:
191 msg = "Different project {} already in XPATH {}". \
192 format(name, xpath)
193 log.error(msg)
194 raise ManoProjXpathPresentErr(msg)
195
196 ret = prefix + proj_prefix + suffix
197 log.debug("XPath with project: {}".format(ret))
198 return ret
199
200
201 def __init__(self, log, name=None, tasklet=None):
202 self._log = log
203 self._name = None
204 self._prefix = None
205 self._pbcm = None
206 self._tasklet = None
207 self._dts = None
208 self._loop = None
209 self._log_hdl = None
210
211 # Track if the apply config was received
212 self._apply = False
213
214 if name:
215 self.name = name
216
217 def update(self, tasklet):
218 # Store the commonly used properties from a tasklet
219 self._tasklet = tasklet
220 self._log_hdl = tasklet.log_hdl
221 self._dts = tasklet.dts
222 self._loop = tasklet.loop
223
224 @property
225 def name(self):
226 return self._name
227
228 @property
229 def log(self):
230 return self._log
231
232 @property
233 def prefix(self):
234 return self._prefix
235
236 @property
237 def pbcm(self):
238 return self._pbcm
239
240 @property
241 def config(self):
242 return self._pbcm.project_config
243
244 @property
245 def tasklet(self):
246 return self._tasklet
247
248 @property
249 def log_hdl(self):
250 return self._log_hdl
251
252 @property
253 def dts(self):
254 return self._dts
255
256 @property
257 def loop(self):
258 return self._loop
259
260 @name.setter
261 def name(self, value):
262 if self._name is None:
263 self._name = value
264 self._prefix = "{}[{}='{}']".format(XPATH,
265 NS_NAME,
266 self._name)
267 self._pbcm = RwProjectManoYang.YangData_RwProject_Project(
268 name=self._name)
269
270 elif self._name == value:
271 self._log.debug("Setting the same name again for project {}".
272 format(value))
273 else:
274 msg = "Project name already set to {}".format(self._name)
275 self._log.error(msg)
276 raise ManoProjNameSetErr(msg)
277
278 def set_from_xpath(self, xpath):
279 self.name = ManoProject.from_xpath(xpath, self._log)
280
281 def add_project(self, xpath):
282 return ManoProject.prefix_project(xpath, log=self._log, project=self._name)
283
284 @abc.abstractmethod
285 @asyncio.coroutine
286 def delete_prepare(self):
287 self._log.debug("Delete prepare for project {}".format(self._name))
288 return True
289
290 @abc.abstractmethod
291 @asyncio.coroutine
292 def register(self):
293 msg = "Register not implemented for project type {}". \
294 format(self.__class__.__name__)
295 self._log.error(msg)
296 raise NotImplementedError(msg)
297
298 @abc.abstractmethod
299 def deregister(self):
300 msg = "De-register not implemented for project type {}". \
301 format(self.__class__.__name__)
302 self._log.error(msg)
303 raise NotImplementedError(msg)
304
305 def rpc_check(self, msg, xact_info=None):
306 '''Check if the rpc is for this project'''
307 try:
308 project = msg.project_name
309 except AttributeError as e:
310 project = DEFAULT_PROJECT
311
312 if project != self.name:
313 self._log.debug("Project {}: RPC is for different project {}".
314 format(self.name, project))
315 if xact_info:
316 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
317 return False
318
319 return True
320
321 @asyncio.coroutine
322 def create_project(self, dts):
323 proj_xpath = "C,{}/config".format(self.prefix)
324 self._log.info("Creating project: {} with {}".
325 format(proj_xpath, self.config.as_dict()))
326
327 yield from dts.query_create(proj_xpath,
328 rwdts.XactFlag.ADVISE,
329 self.config)
330
331
332 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
333 #TODO: Check why this is getting called during project delete
334 if not dts_member_reg:
335 return [], [], []
336
337 # Unforunately, it is currently difficult to figure out what has exactly
338 # changed in this xact without Pbdelta support (RIFT-4916)
339 # As a workaround, we can fetch the pre and post xact elements and
340 # perform a comparison to figure out adds/deletes/updates
341 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
342 curr_cfgs = list(dts_member_reg.elements)
343
344 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
345 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
346
347 # Find Adds
348 added_keys = set(xact_key_map) - set(curr_key_map)
349 added_cfgs = [xact_key_map[key] for key in added_keys]
350
351 # Find Deletes
352 deleted_keys = set(curr_key_map) - set(xact_key_map)
353 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
354
355 # Find Updates
356 updated_keys = set(curr_key_map) & set(xact_key_map)
357 updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
358
359 return added_cfgs, deleted_cfgs, updated_cfgs
360
361
362 class ProjectConfigCallbacks(object):
363 def __init__(self,
364 on_add_apply=None, on_add_prepare=None,
365 on_delete_apply=None, on_delete_prepare=None):
366
367 @asyncio.coroutine
368 def prepare_noop(*args, **kwargs):
369 pass
370
371 def apply_noop(*args, **kwargs):
372 pass
373
374 self.on_add_apply = on_add_apply
375 self.on_add_prepare = on_add_prepare
376 self.on_delete_apply = on_delete_apply
377 self.on_delete_prepare = on_delete_prepare
378
379 for f in ('on_add_apply', 'on_delete_apply'):
380 ref = getattr(self, f)
381 if ref is None:
382 setattr(self, f, apply_noop)
383 continue
384
385 if asyncio.iscoroutinefunction(ref):
386 raise ValueError('%s cannot be a coroutine' % (f,))
387
388 for f in ('on_add_prepare', 'on_delete_prepare'):
389 ref = getattr(self, f)
390 if ref is None:
391 setattr(self, f, prepare_noop)
392 continue
393
394 if not asyncio.iscoroutinefunction(ref):
395 raise ValueError("%s must be a coroutine" % f)
396
397
398 class ProjectDtsHandler(object):
399 XPATH = "C,{}/project-config".format(XPATH)
400
401 def __init__(self, dts, log, callbacks):
402 self._dts = dts
403 self._log = log
404 self._callbacks = callbacks
405
406 self.reg = None
407 self.projects = []
408
409 @property
410 def log(self):
411 return self._log
412
413 @property
414 def dts(self):
415 return self._dts
416
417 def add_project(self, name):
418 self.log.info("Adding project: {}".format(name))
419
420 if name not in self.projects:
421 self._callbacks.on_add_apply(name)
422 self.projects.append(name)
423 else:
424 self.log.error("Project already present: {}".
425 format(name))
426
427 def delete_project(self, name):
428 self._log.info("Deleting project: {}".format(name))
429 if name in self.projects:
430 self._callbacks.on_delete_apply(name)
431 self.projects.remove(name)
432 else:
433 self.log.error("Unrecognized project: {}".
434 format(name))
435
436 def update_project(self, name):
437 """ Update an existing project
438
439 Currently, we do not take any action on MANO for this,
440 so no callbacks are defined
441
442 Arguments:
443 msg - The project config message
444 """
445 self._log.info("Updating project: {}".format(name))
446 if name in self.projects:
447 pass
448 else:
449 self.log.error("Unrecognized project: {}".
450 format(name))
451
452 def register(self):
453 @asyncio.coroutine
454 def apply_config(dts, acg, xact, action, scratch):
455 self._log.debug("Got project apply config (xact: %s) (action: %s)", xact, action)
456
457 if xact.xact is None:
458 if action == rwdts.AppconfAction.INSTALL:
459 curr_cfg = self._reg.elements
460 for cfg in curr_cfg:
461 self._log.debug("Project being re-added after restart.")
462 self.add_project(cfg)
463 else:
464 # When RIFT first comes up, an INSTALL is called with the current config
465 # Since confd doesn't actally persist data this never has any data so
466 # skip this for now.
467 self._log.debug("No xact handle. Skipping apply config")
468
469 return
470
471 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
472 dts_member_reg=self._reg,
473 xact=xact,
474 key_name="name_ref",
475 )
476
477 # Handle Deletes
478 for cfg in delete_cfgs:
479 self.delete_project(cfg.name_ref)
480
481 # Handle Adds
482 for cfg in add_cfgs:
483 self.add_project(cfg.name_ref)
484
485 # Handle Updates
486 for cfg in update_cfgs:
487 self.update_project(cfg.name_ref)
488
489 return RwTypes.RwStatus.SUCCESS
490
491 @asyncio.coroutine
492 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
493 """ Prepare callback from DTS for Project """
494
495 action = xact_info.query_action
496 # xpath = ks_path.to_xpath(RwProjectYang.get_schema())
497 # name = ManoProject.from_xpath(xpath, self._log)
498 # if not name:
499 # self._log.error("Did not find the project name in ks: {}".
500 # format(xpath))
501 # xact_info.respond_xpath(rwdts.XactRspCode.NACK)
502 # return
503
504 # if name != msg.name_ref:
505 # self._log.error("The project name {} did not match the name {} in config".
506 # format(name, msg.name_ref))
507 # projects = scratch.setdefault('projects', {
508 # 'create': [],
509 # 'update': [],
510 # 'delete': [],
511 # })
512
513 # self._log.error("prepare msg type {}".format(type(msg)))
514 name = msg.name_ref
515
516 self._log.debug("Project %s on_prepare config received (action: %s): %s",
517 name, xact_info.query_action, msg)
518
519 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
520 if name in self.projects:
521 self._log.debug("Project {} already exists. Ignore request".
522 format(name))
523
524 else:
525 self._log.debug("Project {}: Invoking on_prepare add request".
526 format(name))
527 yield from self._callbacks.on_add_prepare(name)
528
529
530 elif action == rwdts.QueryAction.DELETE:
531 # Check if the entire project got deleted
532 fref = ProtobufC.FieldReference.alloc()
533 fref.goto_whole_message(msg.to_pbcm())
534 if fref.is_field_deleted():
535 if name in self.projects:
536 rc = yield from self._callbacks.on_delete_prepare(name)
537 if not rc:
538 self._log.error("Project {} should not be deleted".
539 format(name))
540 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
541 else:
542 self._log.warning("Delete on unknown project: {}".
543 format(name))
544
545 else:
546 self._log.error("Action (%s) NOT SUPPORTED", action)
547 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
548 return
549
550 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
551
552 self._log.debug("Registering for project config using xpath: %s",
553 ProjectDtsHandler.XPATH,
554 )
555
556 acg_handler = rift.tasklets.AppConfGroup.Handler(
557 on_apply=apply_config,
558 )
559
560 with self._dts.appconf_group_create(acg_handler) as acg:
561 self._reg = acg.register(
562 xpath=ProjectDtsHandler.XPATH,
563 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
564 on_prepare=on_prepare,
565 )
566
567 class ProjectHandler(object):
568 def __init__(self, tasklet, project_class, **kw):
569 self._tasklet = tasklet
570 self._log = tasklet.log
571 self._log_hdl = tasklet.log_hdl
572 self._dts = tasklet.dts
573 self._loop = tasklet.loop
574 self._class = project_class
575 self._kw = kw
576
577 self._log.debug("Creating project config handler")
578 self.project_cfg_handler = ProjectDtsHandler(
579 self._dts, self._log,
580 ProjectConfigCallbacks(
581 on_add_apply=self.on_project_added,
582 on_add_prepare=self.on_add_prepare,
583 on_delete_apply=self.on_project_deleted,
584 on_delete_prepare=self.on_delete_prepare,
585 )
586 )
587
588 def _get_tasklet_name(self):
589 return self._tasklet.tasklet_info.instance_name
590
591 def _get_project(self, name):
592 try:
593 proj = self._tasklet.projects[name]
594 except Exception as e:
595 self._log.exception("Project {} ({})not found for tasklet {}: {}".
596 format(name, list(self._tasklet.projects.keys()),
597 self._get_tasklet_name(), e))
598 raise e
599
600 return proj
601
602 def on_project_deleted(self, name):
603 self._log.debug("Project {} deleted".format(name))
604 try:
605 self._get_project(name).deregister()
606 except Exception as e:
607 self._log.exception("Project {} deregister for {} failed: {}".
608 format(name, self._get_tasklet_name(), e))
609
610 try:
611 proj = self._tasklet.projects.pop(name)
612 del proj
613 except Exception as e:
614 self._log.exception("Project {} delete for {} failed: {}".
615 format(name, self._get_tasklet_name(), e))
616
617 def on_project_added(self, name):
618 self._log.debug("Project {} added to tasklet {}".
619 format(name, self._get_tasklet_name()))
620 self._get_project(name)._apply = True
621
622 @asyncio.coroutine
623 def on_add_prepare(self, name):
624 self._log.debug("Project {} to be added to {}".
625 format(name, self._get_tasklet_name()))
626
627 try:
628 self._tasklet.projects[name] = \
629 self._class(name, self._tasklet, **(self._kw))
630 except Exception as e:
631 self._log.exception("Project {} create for {} failed: {}".
632 formatname, self._get_tasklet_name(), e())
633
634 try:
635 yield from self._get_project(name).register()
636 except Exception as e:
637 self._log.exception("Project {} register for tasklet {} failed: {}".
638 format(name, self._get_tasklet_name(), e))
639
640 @asyncio.coroutine
641 def on_delete_prepare(self, name):
642 self._log.debug("Project {} being deleted for tasklet {}".
643 format(name, self._get_tasklet_name()))
644 rc = yield from self._get_project(name).delete_prepare()
645 return rc
646
647 def register(self):
648 self.project_cfg_handler.register()