4 # Copyright 2017 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
24 gi
.require_version('RwProjectYang', '1.0')
25 gi
.require_version('RwDtsYang', '1.0')
26 from gi
.repository
import (
35 class ManoProjectError(Exception):
39 class ManoProjNameSetErr(ManoProjectError
):
43 class ManoProjXpathNoProjErr(ManoProjectError
):
47 class ManoProjXpathKeyErr(ManoProjectError
):
51 class ManoProjXpathNotRootErr(ManoProjectError
):
55 class ManoProjXpathPresentErr(ManoProjectError
):
61 NS_PROJECT
= '{}:{}'.format(NS
, PROJECT
)
62 XPATH
= '/{}'.format(NS_PROJECT
)
63 XPATH_LEN
= len(XPATH
)
67 NS_NAME
= '{}:{}'.format(NS
, NAME
)
69 DEFAULT_PROJECT
= 'default'
70 DEFAULT_PREFIX
= "{}[{}='{}']".format(XPATH
,
75 class ManoProject(object):
76 '''Class to handle the project name'''
81 def instance_from_xpath(cls
, xpath
, log
):
82 name
= cls
.from_xpath(xpath
, log
)
86 proj
= ManoProject(log
, name
=name
)
90 def from_xpath(cls
, xpath
, log
):
91 log
.debug("Get project name from {}".format(xpath
));
94 idx
= xpath
.find(XPATH
)
96 msg
= "Project not found in XPATH: {}".format(xpath
)
98 raise ManoProjXpathNoProjErr(msg
)
100 sub
= xpath
[idx
+XPATH_LEN
:].strip()
101 if (len(sub
) < NAME_LEN
) or (sub
[0] != '['):
102 msg
= "Project name not found in XPath: {}".format(xpath
)
104 raise ManoProjXpathKeyErr(msg
)
106 sub
= sub
[1:].strip()
107 idx
= sub
.find(NS_NAME
)
111 msg
= "Project name not found in XPath: {}".format(xpath
)
113 raise ManoProjXpathKeyErr(msg
)
117 msg
= "XPath is invalid: {}".format(xpath
)
119 raise ManoProjXpathKeyErr(msg
)
121 sub
= sub
[:idx
].strip()
123 log
.debug("Key and value found: {}".format(sub
))
124 k
, n
= sub
.split("=", 2)
125 name
= n
.strip(' \'"')
127 msg
= "Project name is empty in XPath".format(xpath
)
129 raise ManoProjXpathKeyErr (msg
)
131 log
.debug("Found project name {} from XPath {}".
135 except ValueError as e
:
136 msg
= "Project name not found in XPath: {}, exception: {}" \
139 raise ManoProjXpathKeyErr(msg
)
141 msg
= "Project not found in XPATH: {}".format(xpath
)
143 raise ManoProjXpathNoProjErr(msg
)
148 cls
.log
= logging
.getLogger('rw-mano-log.rw-project')
149 cls
.log
.setLevel(logging
.ERROR
)
152 def prefix_project(cls
, xpath
, project
=None, log
=None):
157 project
= DEFAULT_PROJECT
158 proj_prefix
= DEFAULT_PREFIX
160 proj_prefix
= "{}[{}='{}']".format(XPATH
,
164 log
.debug("Add project {} to {}".format(project
, xpath
))
168 idx
= xpath
.find('C,/')
170 idx
= xpath
.find('D,/')
178 msg
= "Non-rooted xpath provided: {}".format(xpath
)
180 raise ManoProjXpathNotRootErr(msg
)
182 idx
= suffix
.find(XPATH
)
184 name
= cls
.from_xpath(xpath
, log
)
186 log
.debug("Project already in the XPATH: {}".format(xpath
))
190 msg
= "Different project {} already in XPATH {}". \
193 raise ManoProjXpathPresentErr(msg
)
195 ret
= prefix
+ proj_prefix
+ suffix
196 log
.debug("XPath with project: {}".format(ret
))
200 def __init__(self
, log
, name
=None, tasklet
=None):
210 # Track if the apply config was received
216 def update(self
, tasklet
):
217 # Store the commonly used properties from a tasklet
218 self
._tasklet
= tasklet
219 self
._log
_hdl
= tasklet
.log_hdl
220 self
._dts
= tasklet
.dts
221 self
._loop
= tasklet
.loop
241 return self
._pbcm
.project_config
260 def name(self
, value
):
261 if self
._name
is None:
263 self
._prefix
= "{}[{}='{}']".format(XPATH
,
266 self
._pbcm
= RwProjectYang
.YangData_RwProject_Project(
269 elif self
._name
== value
:
270 self
._log
.debug("Setting the same name again for project {}".
273 msg
= "Project name already set to {}".format(self
._name
)
275 raise ManoProjNameSetErr(msg
)
277 def set_from_xpath(self
, xpath
):
278 self
.name
= ManoProject
.from_xpath(xpath
, self
._log
)
280 def add_project(self
, xpath
):
281 return ManoProject
.prefix_project(xpath
, log
=self
._log
, project
=self
._name
)
285 def delete_prepare(self
):
286 self
._log
.debug("Delete prepare for project {}".format(self
._name
))
292 msg
= "Register not implemented for project type {}". \
293 format(self
.__class
__.__name
__)
295 raise NotImplementedError(msg
)
298 def deregister(self
):
299 msg
= "De-register not implemented for project type {}". \
300 format(self
.__class
__.__name
__)
302 raise NotImplementedError(msg
)
304 def rpc_check(self
, msg
, xact_info
=None):
305 '''Check if the rpc is for this project'''
307 project
= msg
.project_name
308 except AttributeError as e
:
309 project
= DEFAULT_PROJECT
311 if project
!= self
.name
:
312 self
._log
.debug("Project {}: RPC is for different project {}".
313 format(self
.name
, project
))
315 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
321 def create_project(self
, dts
):
322 proj_xpath
= "C,{}/project-config".format(self
.prefix
)
323 self
._log
.info("Creating project: {} with {}".
324 format(proj_xpath
, self
.config
.as_dict()))
326 yield from dts
.query_create(proj_xpath
,
327 rwdts
.XactFlag
.ADVISE
,
331 def get_add_delete_update_cfgs(dts_member_reg
, xact
, key_name
):
332 #TODO: Check why this is getting called during project delete
333 if not dts_member_reg
:
336 # Unforunately, it is currently difficult to figure out what has exactly
337 # changed in this xact without Pbdelta support (RIFT-4916)
338 # As a workaround, we can fetch the pre and post xact elements and
339 # perform a comparison to figure out adds/deletes/updates
340 xact_cfgs
= list(dts_member_reg
.get_xact_elements(xact
))
341 curr_cfgs
= list(dts_member_reg
.elements
)
343 xact_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in xact_cfgs
}
344 curr_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in curr_cfgs
}
347 added_keys
= set(xact_key_map
) - set(curr_key_map
)
348 added_cfgs
= [xact_key_map
[key
] for key
in added_keys
]
351 deleted_keys
= set(curr_key_map
) - set(xact_key_map
)
352 deleted_cfgs
= [curr_key_map
[key
] for key
in deleted_keys
]
355 updated_keys
= set(curr_key_map
) & set(xact_key_map
)
356 updated_cfgs
= [xact_key_map
[key
] for key
in updated_keys
if xact_key_map
[key
] != curr_key_map
[key
]]
358 return added_cfgs
, deleted_cfgs
, updated_cfgs
361 class ProjectConfigCallbacks(object):
363 on_add_apply
=None, on_add_prepare
=None,
364 on_delete_apply
=None, on_delete_prepare
=None):
367 def prepare_noop(*args
, **kwargs
):
370 def apply_noop(*args
, **kwargs
):
373 self
.on_add_apply
= on_add_apply
374 self
.on_add_prepare
= on_add_prepare
375 self
.on_delete_apply
= on_delete_apply
376 self
.on_delete_prepare
= on_delete_prepare
378 for f
in ('on_add_apply', 'on_delete_apply'):
379 ref
= getattr(self
, f
)
381 setattr(self
, f
, apply_noop
)
384 if asyncio
.iscoroutinefunction(ref
):
385 raise ValueError('%s cannot be a coroutine' % (f
,))
387 for f
in ('on_add_prepare', 'on_delete_prepare'):
388 ref
= getattr(self
, f
)
390 setattr(self
, f
, prepare_noop
)
393 if not asyncio
.iscoroutinefunction(ref
):
394 raise ValueError("%s must be a coroutine" % f
)
397 class ProjectDtsHandler(object):
398 XPATH
= "C,{}/project-config".format(XPATH
)
400 def __init__(self
, dts
, log
, callbacks
):
403 self
._callbacks
= callbacks
416 def add_project(self
, name
):
417 self
.log
.info("Adding project: {}".format(name
))
419 if name
not in self
.projects
:
420 self
._callbacks
.on_add_apply(name
)
421 self
.projects
.append(name
)
423 self
.log
.error("Project already present: {}".
426 def delete_project(self
, name
):
427 self
._log
.info("Deleting project: {}".format(name
))
428 if name
in self
.projects
:
429 self
._callbacks
.on_delete_apply(name
)
430 self
.projects
.remove(name
)
432 self
.log
.error("Unrecognized project: {}".
435 def update_project(self
, name
):
436 """ Update an existing project
438 Currently, we do not take any action on MANO for this,
439 so no callbacks are defined
442 msg - The project config message
444 self
._log
.info("Updating project: {}".format(name
))
445 if name
in self
.projects
:
448 self
.log
.error("Unrecognized project: {}".
453 def apply_config(dts
, acg
, xact
, action
, scratch
):
454 self
._log
.debug("Got project apply config (xact: %s) (action: %s)", xact
, action
)
456 if xact
.xact
is None:
457 if action
== rwdts
.AppconfAction
.INSTALL
:
458 curr_cfg
= self
._reg
.elements
460 self
._log
.debug("Project being re-added after restart.")
461 self
.add_project(cfg
)
463 # When RIFT first comes up, an INSTALL is called with the current config
464 # Since confd doesn't actally persist data this never has any data so
466 self
._log
.debug("No xact handle. Skipping apply config")
470 add_cfgs
, delete_cfgs
, update_cfgs
= get_add_delete_update_cfgs(
471 dts_member_reg
=self
._reg
,
477 for cfg
in delete_cfgs
:
478 self
.delete_project(cfg
.name_ref
)
482 self
.add_project(cfg
.name_ref
)
485 for cfg
in update_cfgs
:
486 self
.update_project(cfg
.name_ref
)
489 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
490 """ Prepare callback from DTS for Project """
492 # xpath = ks_path.to_xpath(RwProjectYang.get_schema())
493 # name = ManoProject.from_xpath(xpath, self._log)
495 # self._log.error("Did not find the project name in ks: {}".
497 # xact_info.respond_xpath(rwdts.XactRspCode.NACK)
500 action
= xact_info
.query_action
502 self
._log
.debug("Project %s on_prepare config received (action: %s): %s",
503 name
, xact_info
.query_action
, msg
)
505 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
506 if name
in self
.projects
:
507 self
._log
.debug("Project {} already exists. Ignore request".
511 self
._log
.debug("Project {}: Invoking on_prepare add request".
513 yield from self
._callbacks
.on_add_prepare(name
)
515 elif action
== rwdts
.QueryAction
.DELETE
:
516 # Check if the entire project got deleted
517 fref
= ProtobufC
.FieldReference
.alloc()
518 fref
.goto_whole_message(msg
.to_pbcm())
519 if fref
.is_field_deleted():
520 if name
in self
.projects
:
521 rc
= yield from self
._callbacks
.on_delete_prepare(name
)
523 self
._log
.error("Project {} should not be deleted".
525 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
527 self
._log
.warning("Delete on unknown project: {}".
531 self
._log
.error("Action (%s) NOT SUPPORTED", action
)
532 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
535 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
537 self
._log
.debug("Registering for project config using xpath: %s",
538 ProjectDtsHandler
.XPATH
,
541 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
542 on_apply
=apply_config
,
545 with self
._dts
.appconf_group_create(acg_handler
) as acg
:
546 self
._reg
= acg
.register(
547 xpath
=ProjectDtsHandler
.XPATH
,
548 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
549 on_prepare
=on_prepare
,
552 class ProjectHandler(object):
553 def __init__(self
, tasklet
, project_class
, **kw
):
554 self
._tasklet
= tasklet
555 self
._log
= tasklet
.log
556 self
._log
_hdl
= tasklet
.log_hdl
557 self
._dts
= tasklet
.dts
558 self
._loop
= tasklet
.loop
559 self
._class
= project_class
562 self
._log
.debug("Creating project config handler")
563 self
.project_cfg_handler
= ProjectDtsHandler(
564 self
._dts
, self
._log
,
565 ProjectConfigCallbacks(
566 on_add_apply
=self
.on_project_added
,
567 on_add_prepare
=self
.on_add_prepare
,
568 on_delete_apply
=self
.on_project_deleted
,
569 on_delete_prepare
=self
.on_delete_prepare
,
573 def _get_tasklet_name(self
):
574 return self
._tasklet
.tasklet_info
.instance_name
576 def _get_project(self
, name
):
578 proj
= self
._tasklet
.projects
[name
]
579 except Exception as e
:
580 self
._log
.exception("Project {} ({})not found for tasklet {}: {}".
581 format(name
, list(self
._tasklet
.projects
.keys()),
582 self
._get
_tasklet
_name
(), e
))
587 def on_project_deleted(self
, name
):
588 self
._log
.debug("Project {} deleted".format(name
))
590 self
._get
_project
(name
).deregister()
591 except Exception as e
:
592 self
._log
.exception("Project {} deregister for {} failed: {}".
593 format(name
, self
._get
_tasklet
_name
(), e
))
596 proj
= self
._tasklet
.projects
.pop(name
)
598 except Exception as e
:
599 self
._log
.exception("Project {} delete for {} failed: {}".
600 format(name
, self
._get
_tasklet
_name
(), e
))
602 def on_project_added(self
, name
):
603 self
._log
.debug("Project {} added to tasklet {}".
604 format(name
, self
._get
_tasklet
_name
()))
605 self
._get
_project
(name
)._apply
= True
608 def on_add_prepare(self
, name
):
609 self
._log
.debug("Project {} to be added to {}".
610 format(name
, self
._get
_tasklet
_name
()))
613 self
._tasklet
.projects
[name
] = \
614 self
._class
(name
, self
._tasklet
, **(self
._kw
))
615 except Exception as e
:
616 self
._log
.exception("Project {} create for {} failed: {}".
617 formatname
, self
._get
_tasklet
_name
(), e())
620 yield from self
._get
_project
(name
).register()
621 except Exception as e
:
622 self
._log
.exception("Project {} register for tasklet {} failed: {}".
623 format(name
, self
._get
_tasklet
_name
(), e
))
626 def on_delete_prepare(self
, name
):
627 self
._log
.debug("Project {} being deleted for tasklet {}".
628 format(name
, self
._get
_tasklet
_name
()))
629 rc
= yield from self
._get
_project
(name
).delete_prepare()
633 self
.project_cfg_handler
.register()