de495e9a696ffae443bf8ba6ec42097017b01950
4 # Copyright 2017 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
24 gi
.require_version('RwProjectManoYang', '1.0')
25 gi
.require_version('RwDtsYang', '1.0')
26 from gi
.repository
import (
36 class ManoProjectError(Exception):
40 class ManoProjNameSetErr(ManoProjectError
):
44 class ManoProjXpathNoProjErr(ManoProjectError
):
48 class ManoProjXpathKeyErr(ManoProjectError
):
52 class ManoProjXpathNotRootErr(ManoProjectError
):
56 class ManoProjXpathPresentErr(ManoProjectError
):
62 NS_PROJECT
= '{}:{}'.format(NS
, PROJECT
)
63 XPATH
= '/{}'.format(NS_PROJECT
)
64 XPATH_LEN
= len(XPATH
)
68 NS_NAME
= '{}:{}'.format(NS
, NAME
)
70 DEFAULT_PROJECT
= 'default'
71 DEFAULT_PREFIX
= "{}[{}='{}']".format(XPATH
,
76 class ManoProject(object):
77 '''Class to handle the project name'''
82 def instance_from_xpath(cls
, xpath
, log
):
83 name
= cls
.from_xpath(xpath
, log
)
87 proj
= ManoProject(log
, name
=name
)
91 def from_xpath(cls
, xpath
, log
):
92 log
.debug("Get project name from {}".format(xpath
));
95 idx
= xpath
.find(XPATH
)
97 msg
= "Project not found in XPATH: {}".format(xpath
)
99 raise ManoProjXpathNoProjErr(msg
)
101 sub
= xpath
[idx
+XPATH_LEN
:].strip()
102 if (len(sub
) < NAME_LEN
) or (sub
[0] != '['):
103 msg
= "Project name not found in XPath: {}".format(xpath
)
105 raise ManoProjXpathKeyErr(msg
)
107 sub
= sub
[1:].strip()
108 idx
= sub
.find(NS_NAME
)
112 msg
= "Project name not found in XPath: {}".format(xpath
)
114 raise ManoProjXpathKeyErr(msg
)
118 msg
= "XPath is invalid: {}".format(xpath
)
120 raise ManoProjXpathKeyErr(msg
)
122 sub
= sub
[:idx
].strip()
124 log
.debug("Key and value found: {}".format(sub
))
125 k
, n
= sub
.split("=", 2)
126 name
= n
.strip(' \'"')
128 msg
= "Project name is empty in XPath".format(xpath
)
130 raise ManoProjXpathKeyErr (msg
)
132 log
.debug("Found project name {} from XPath {}".
136 except ValueError as e
:
137 msg
= "Project name not found in XPath: {}, exception: {}" \
140 raise ManoProjXpathKeyErr(msg
)
142 msg
= "Project not found in XPATH: {}".format(xpath
)
144 raise ManoProjXpathNoProjErr(msg
)
149 cls
.log
= logging
.getLogger('rw-mano-log.rw-project')
150 cls
.log
.setLevel(logging
.ERROR
)
153 def prefix_project(cls
, xpath
, project
=None, log
=None):
158 project
= DEFAULT_PROJECT
159 proj_prefix
= DEFAULT_PREFIX
161 proj_prefix
= "{}[{}='{}']".format(XPATH
,
165 log
.debug("Add project {} to {}".format(project
, xpath
))
169 idx
= xpath
.find('C,/')
171 idx
= xpath
.find('D,/')
179 msg
= "Non-rooted xpath provided: {}".format(xpath
)
181 raise ManoProjXpathNotRootErr(msg
)
183 idx
= suffix
.find(XPATH
)
185 name
= cls
.from_xpath(xpath
, log
)
187 log
.debug("Project already in the XPATH: {}".format(xpath
))
191 msg
= "Different project {} already in XPATH {}". \
194 raise ManoProjXpathPresentErr(msg
)
196 ret
= prefix
+ proj_prefix
+ suffix
197 log
.debug("XPath with project: {}".format(ret
))
201 def __init__(self
, log
, name
=None, tasklet
=None):
211 # Track if the apply config was received
217 def update(self
, tasklet
):
218 # Store the commonly used properties from a tasklet
219 self
._tasklet
= tasklet
220 self
._log
_hdl
= tasklet
.log_hdl
221 self
._dts
= tasklet
.dts
222 self
._loop
= tasklet
.loop
242 return self
._pbcm
.project_config
261 def name(self
, value
):
262 if self
._name
is None:
264 self
._prefix
= "{}[{}='{}']".format(XPATH
,
267 self
._pbcm
= RwProjectManoYang
.YangData_RwProject_Project(
270 elif self
._name
== value
:
271 self
._log
.debug("Setting the same name again for project {}".
274 msg
= "Project name already set to {}".format(self
._name
)
276 raise ManoProjNameSetErr(msg
)
278 def set_from_xpath(self
, xpath
):
279 self
.name
= ManoProject
.from_xpath(xpath
, self
._log
)
281 def add_project(self
, xpath
):
282 return ManoProject
.prefix_project(xpath
, log
=self
._log
, project
=self
._name
)
286 def delete_prepare(self
):
287 self
._log
.debug("Delete prepare for project {}".format(self
._name
))
293 msg
= "Register not implemented for project type {}". \
294 format(self
.__class
__.__name
__)
296 raise NotImplementedError(msg
)
299 def deregister(self
):
300 msg
= "De-register not implemented for project type {}". \
301 format(self
.__class
__.__name
__)
303 raise NotImplementedError(msg
)
305 def rpc_check(self
, msg
, xact_info
=None):
306 '''Check if the rpc is for this project'''
308 project
= msg
.project_name
309 except AttributeError as e
:
310 project
= DEFAULT_PROJECT
312 if project
!= self
.name
:
313 self
._log
.debug("Project {}: RPC is for different project {}".
314 format(self
.name
, project
))
316 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
322 def create_project(self
, dts
):
323 proj_xpath
= "C,{}/config".format(self
.prefix
)
324 self
._log
.info("Creating project: {} with {}".
325 format(proj_xpath
, self
.config
.as_dict()))
327 yield from dts
.query_create(proj_xpath
,
328 rwdts
.XactFlag
.ADVISE
,
332 def get_add_delete_update_cfgs(dts_member_reg
, xact
, key_name
):
333 #TODO: Check why this is getting called during project delete
334 if not dts_member_reg
:
337 # Unforunately, it is currently difficult to figure out what has exactly
338 # changed in this xact without Pbdelta support (RIFT-4916)
339 # As a workaround, we can fetch the pre and post xact elements and
340 # perform a comparison to figure out adds/deletes/updates
341 xact_cfgs
= list(dts_member_reg
.get_xact_elements(xact
))
342 curr_cfgs
= list(dts_member_reg
.elements
)
344 xact_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in xact_cfgs
}
345 curr_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in curr_cfgs
}
348 added_keys
= set(xact_key_map
) - set(curr_key_map
)
349 added_cfgs
= [xact_key_map
[key
] for key
in added_keys
]
352 deleted_keys
= set(curr_key_map
) - set(xact_key_map
)
353 deleted_cfgs
= [curr_key_map
[key
] for key
in deleted_keys
]
356 updated_keys
= set(curr_key_map
) & set(xact_key_map
)
357 updated_cfgs
= [xact_key_map
[key
] for key
in updated_keys
if xact_key_map
[key
] != curr_key_map
[key
]]
359 return added_cfgs
, deleted_cfgs
, updated_cfgs
362 class ProjectConfigCallbacks(object):
364 on_add_apply
=None, on_add_prepare
=None,
365 on_delete_apply
=None, on_delete_prepare
=None):
368 def prepare_noop(*args
, **kwargs
):
371 def apply_noop(*args
, **kwargs
):
374 self
.on_add_apply
= on_add_apply
375 self
.on_add_prepare
= on_add_prepare
376 self
.on_delete_apply
= on_delete_apply
377 self
.on_delete_prepare
= on_delete_prepare
379 for f
in ('on_add_apply', 'on_delete_apply'):
380 ref
= getattr(self
, f
)
382 setattr(self
, f
, apply_noop
)
385 if asyncio
.iscoroutinefunction(ref
):
386 raise ValueError('%s cannot be a coroutine' % (f
,))
388 for f
in ('on_add_prepare', 'on_delete_prepare'):
389 ref
= getattr(self
, f
)
391 setattr(self
, f
, prepare_noop
)
394 if not asyncio
.iscoroutinefunction(ref
):
395 raise ValueError("%s must be a coroutine" % f
)
398 class ProjectDtsHandler(object):
399 XPATH
= "C,{}/project-config".format(XPATH
)
401 def __init__(self
, dts
, log
, callbacks
):
404 self
._callbacks
= callbacks
417 def add_project(self
, name
):
418 self
.log
.info("Adding project: {}".format(name
))
420 if name
not in self
.projects
:
421 self
._callbacks
.on_add_apply(name
)
422 self
.projects
.append(name
)
424 self
.log
.error("Project already present: {}".
427 def delete_project(self
, name
):
428 self
._log
.info("Deleting project: {}".format(name
))
429 if name
in self
.projects
:
430 self
._callbacks
.on_delete_apply(name
)
431 self
.projects
.remove(name
)
433 self
.log
.error("Unrecognized project: {}".
436 def update_project(self
, name
):
437 """ Update an existing project
439 Currently, we do not take any action on MANO for this,
440 so no callbacks are defined
443 msg - The project config message
445 self
._log
.info("Updating project: {}".format(name
))
446 if name
in self
.projects
:
449 self
.log
.error("Unrecognized project: {}".
454 def apply_config(dts
, acg
, xact
, action
, scratch
):
455 self
._log
.debug("Got project apply config (xact: %s) (action: %s)", xact
, action
)
457 if xact
.xact
is None:
458 if action
== rwdts
.AppconfAction
.INSTALL
:
459 curr_cfg
= self
._reg
.elements
461 self
._log
.debug("Project being re-added after restart.")
462 self
.add_project(cfg
)
464 # When RIFT first comes up, an INSTALL is called with the current config
465 # Since confd doesn't actally persist data this never has any data so
467 self
._log
.debug("No xact handle. Skipping apply config")
471 add_cfgs
, delete_cfgs
, update_cfgs
= get_add_delete_update_cfgs(
472 dts_member_reg
=self
._reg
,
478 for cfg
in delete_cfgs
:
479 self
.delete_project(cfg
.name_ref
)
483 self
.add_project(cfg
.name_ref
)
486 for cfg
in update_cfgs
:
487 self
.update_project(cfg
.name_ref
)
489 return RwTypes
.RwStatus
.SUCCESS
492 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
493 """ Prepare callback from DTS for Project """
495 action
= xact_info
.query_action
496 # xpath = ks_path.to_xpath(RwProjectYang.get_schema())
497 # name = ManoProject.from_xpath(xpath, self._log)
499 # self._log.error("Did not find the project name in ks: {}".
501 # xact_info.respond_xpath(rwdts.XactRspCode.NACK)
504 # if name != msg.name_ref:
505 # self._log.error("The project name {} did not match the name {} in config".
506 # format(name, msg.name_ref))
507 # projects = scratch.setdefault('projects', {
513 # self._log.error("prepare msg type {}".format(type(msg)))
516 self
._log
.debug("Project %s on_prepare config received (action: %s): %s",
517 name
, xact_info
.query_action
, msg
)
519 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
520 if name
in self
.projects
:
521 self
._log
.debug("Project {} already exists. Ignore request".
525 self
._log
.debug("Project {}: Invoking on_prepare add request".
527 yield from self
._callbacks
.on_add_prepare(name
)
530 elif action
== rwdts
.QueryAction
.DELETE
:
531 # Check if the entire project got deleted
532 fref
= ProtobufC
.FieldReference
.alloc()
533 fref
.goto_whole_message(msg
.to_pbcm())
534 if fref
.is_field_deleted():
535 if name
in self
.projects
:
536 rc
= yield from self
._callbacks
.on_delete_prepare(name
)
538 self
._log
.error("Project {} should not be deleted".
540 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
542 self
._log
.warning("Delete on unknown project: {}".
546 self
._log
.error("Action (%s) NOT SUPPORTED", action
)
547 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
550 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
552 self
._log
.debug("Registering for project config using xpath: %s",
553 ProjectDtsHandler
.XPATH
,
556 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
557 on_apply
=apply_config
,
560 with self
._dts
.appconf_group_create(acg_handler
) as acg
:
561 self
._reg
= acg
.register(
562 xpath
=ProjectDtsHandler
.XPATH
,
563 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
564 on_prepare
=on_prepare
,
567 class ProjectHandler(object):
568 def __init__(self
, tasklet
, project_class
, **kw
):
569 self
._tasklet
= tasklet
570 self
._log
= tasklet
.log
571 self
._log
_hdl
= tasklet
.log_hdl
572 self
._dts
= tasklet
.dts
573 self
._loop
= tasklet
.loop
574 self
._class
= project_class
577 self
._log
.debug("Creating project config handler")
578 self
.project_cfg_handler
= ProjectDtsHandler(
579 self
._dts
, self
._log
,
580 ProjectConfigCallbacks(
581 on_add_apply
=self
.on_project_added
,
582 on_add_prepare
=self
.on_add_prepare
,
583 on_delete_apply
=self
.on_project_deleted
,
584 on_delete_prepare
=self
.on_delete_prepare
,
588 def _get_tasklet_name(self
):
589 return self
._tasklet
.tasklet_info
.instance_name
591 def _get_project(self
, name
):
593 proj
= self
._tasklet
.projects
[name
]
594 except Exception as e
:
595 self
._log
.exception("Project {} ({})not found for tasklet {}: {}".
596 format(name
, list(self
._tasklet
.projects
.keys()),
597 self
._get
_tasklet
_name
(), e
))
602 def on_project_deleted(self
, name
):
603 self
._log
.debug("Project {} deleted".format(name
))
605 self
._get
_project
(name
).deregister()
606 except Exception as e
:
607 self
._log
.exception("Project {} deregister for {} failed: {}".
608 format(name
, self
._get
_tasklet
_name
(), e
))
611 proj
= self
._tasklet
.projects
.pop(name
)
613 except Exception as e
:
614 self
._log
.exception("Project {} delete for {} failed: {}".
615 format(name
, self
._get
_tasklet
_name
(), e
))
617 def on_project_added(self
, name
):
618 self
._log
.debug("Project {} added to tasklet {}".
619 format(name
, self
._get
_tasklet
_name
()))
620 self
._get
_project
(name
)._apply
= True
623 def on_add_prepare(self
, name
):
624 self
._log
.debug("Project {} to be added to {}".
625 format(name
, self
._get
_tasklet
_name
()))
628 self
._tasklet
.projects
[name
] = \
629 self
._class
(name
, self
._tasklet
, **(self
._kw
))
630 except Exception as e
:
631 self
._log
.exception("Project {} create for {} failed: {}".
632 formatname
, self
._get
_tasklet
_name
(), e())
635 yield from self
._get
_project
(name
).register()
636 except Exception as e
:
637 self
._log
.exception("Project {} register for tasklet {} failed: {}".
638 format(name
, self
._get
_tasklet
_name
(), e
))
641 def on_delete_prepare(self
, name
):
642 self
._log
.debug("Project {} being deleted for tasklet {}".
643 format(name
, self
._get
_tasklet
_name
()))
644 rc
= yield from self
._get
_project
(name
).delete_prepare()
648 self
.project_cfg_handler
.register()