6e431772f56b3157c79d08d7460792ccbc0384f0
4 # Copyright 2017 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
22 from time
import sleep
25 gi
.require_version('RwProjectManoYang', '1.0')
26 gi
.require_version('RwDts', '1.0')
27 from gi
.repository
import (
37 class ManoProjectError(Exception):
41 class ManoProjNameSetErr(ManoProjectError
):
45 class ManoProjXpathNoProjErr(ManoProjectError
):
49 class ManoProjXpathKeyErr(ManoProjectError
):
53 class ManoProjXpathNotRootErr(ManoProjectError
):
57 class ManoProjXpathPresentErr(ManoProjectError
):
63 NS_PROJECT
= '{}:{}'.format(NS
, PROJECT
)
64 XPATH
= '/{}'.format(NS_PROJECT
)
65 XPATH_LEN
= len(XPATH
)
69 NS_NAME
= '{}:{}'.format(NS
, NAME
)
71 DEFAULT_PROJECT
= 'default'
72 DEFAULT_PREFIX
= "{}[{}='{}']".format(XPATH
,
77 class ManoProject(object):
78 '''Class to handle the project name'''
83 def instance_from_xpath(cls
, xpath
, log
):
84 name
= cls
.from_xpath(xpath
, log
)
88 proj
= ManoProject(log
, name
=name
)
92 def from_xpath(cls
, xpath
, log
):
93 log
.debug("Get project name from {}".format(xpath
));
96 idx
= xpath
.find(XPATH
)
98 msg
= "Project not found in XPATH: {}".format(xpath
)
100 raise ManoProjXpathNoProjErr(msg
)
102 sub
= xpath
[idx
+XPATH_LEN
:].strip()
103 if (len(sub
) < NAME_LEN
) or (sub
[0] != '['):
104 msg
= "Project name not found in XPath: {}".format(xpath
)
106 raise ManoProjXpathKeyErr(msg
)
108 sub
= sub
[1:].strip()
109 idx
= sub
.find(NS_NAME
)
113 msg
= "Project name not found in XPath: {}".format(xpath
)
115 raise ManoProjXpathKeyErr(msg
)
119 msg
= "XPath is invalid: {}".format(xpath
)
121 raise ManoProjXpathKeyErr(msg
)
123 sub
= sub
[:idx
].strip()
125 log
.debug("Key and value found: {}".format(sub
))
126 k
, n
= sub
.split("=", 2)
127 name
= n
.strip(' \'"')
129 msg
= "Project name is empty in XPath".format(xpath
)
131 raise ManoProjXpathKeyErr (msg
)
133 log
.debug("Found project name {} from XPath {}".
137 except ValueError as e
:
138 msg
= "Project name not found in XPath: {}, exception: {}" \
141 raise ManoProjXpathKeyErr(msg
)
143 msg
= "Project not found in XPATH: {}".format(xpath
)
145 raise ManoProjXpathNoProjErr(msg
)
150 cls
.log
= logging
.getLogger('rw-mano-log.rw-project')
151 cls
.log
.setLevel(logging
.ERROR
)
154 def prefix_project(cls
, xpath
, project
=None, log
=None):
159 project
= DEFAULT_PROJECT
160 proj_prefix
= DEFAULT_PREFIX
162 proj_prefix
= "{}[{}='{}']".format(XPATH
,
168 idx
= xpath
.find('C,/')
170 idx
= xpath
.find('D,/')
178 msg
= "Non-rooted xpath provided: {}".format(xpath
)
180 raise ManoProjXpathNotRootErr(msg
)
182 idx
= suffix
.find(XPATH
)
184 name
= cls
.from_xpath(xpath
, log
)
186 log
.debug("Project already in the XPATH: {}".format(xpath
))
190 msg
= "Different project {} already in XPATH {}". \
193 raise ManoProjXpathPresentErr(msg
)
195 ret
= prefix
+ proj_prefix
+ suffix
199 def __init__(self
, log
, name
=None, tasklet
=None):
209 # Track if the apply config was received
215 def update(self
, tasklet
):
216 # Store the commonly used properties from a tasklet
217 self
._tasklet
= tasklet
218 self
._log
_hdl
= tasklet
.log_hdl
219 self
._dts
= tasklet
.dts
220 self
._loop
= tasklet
.loop
240 return self
._pbcm
.project_config
259 def name(self
, value
):
260 if self
._name
is None:
262 self
._prefix
= "{}[{}='{}']".format(XPATH
,
265 self
._pbcm
= RwProjectManoYang
.YangData_RwProject_Project(
268 elif self
._name
== value
:
269 self
._log
.debug("Setting the same name again for project {}".
272 msg
= "Project name already set to {}".format(self
._name
)
274 raise ManoProjNameSetErr(msg
)
276 def set_from_xpath(self
, xpath
):
277 self
.name
= ManoProject
.from_xpath(xpath
, self
._log
)
279 def add_project(self
, xpath
):
280 return ManoProject
.prefix_project(xpath
, log
=self
._log
, project
=self
._name
)
284 def delete_prepare(self
):
285 self
._log
.debug("Delete prepare for project {}".format(self
._name
))
286 return (True, "True")
291 msg
= "Register not implemented for project type {}". \
292 format(self
.__class
__.__name
__)
294 raise NotImplementedError(msg
)
297 def deregister(self
):
298 msg
= "De-register not implemented for project type {}". \
299 format(self
.__class
__.__name
__)
301 raise NotImplementedError(msg
)
303 def rpc_check(self
, msg
, xact_info
=None):
304 '''Check if the rpc is for this project'''
306 project
= msg
.project_name
307 except AttributeError as e
:
308 project
= DEFAULT_PROJECT
310 if project
!= self
.name
:
311 self
._log
.debug("Project {}: RPC is for different project {}".
312 format(self
.name
, project
))
314 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
320 def create_project(self
, dts
):
321 proj_xpath
= "C,{}/config".format(self
.prefix
)
322 self
._log
.info("Creating project: {} with {}".
323 format(proj_xpath
, self
.config
.as_dict()))
325 yield from dts
.query_create(proj_xpath
,
326 rwdts
.XactFlag
.ADVISE
,
330 def get_add_delete_update_cfgs(dts_member_reg
, xact
, key_name
):
331 #TODO: Check why this is getting called during project delete
332 if not dts_member_reg
:
335 # Unforunately, it is currently difficult to figure out what has exactly
336 # changed in this xact without Pbdelta support (RIFT-4916)
337 # As a workaround, we can fetch the pre and post xact elements and
338 # perform a comparison to figure out adds/deletes/updates
339 xact_cfgs
= list(dts_member_reg
.get_xact_elements(xact
))
340 curr_cfgs
= list(dts_member_reg
.elements
)
342 xact_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in xact_cfgs
}
343 curr_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in curr_cfgs
}
346 added_keys
= set(xact_key_map
) - set(curr_key_map
)
347 added_cfgs
= [xact_key_map
[key
] for key
in added_keys
]
350 deleted_keys
= set(curr_key_map
) - set(xact_key_map
)
351 deleted_cfgs
= [curr_key_map
[key
] for key
in deleted_keys
]
354 updated_keys
= set(curr_key_map
) & set(xact_key_map
)
355 updated_cfgs
= [xact_key_map
[key
] for key
in updated_keys
if xact_key_map
[key
] != curr_key_map
[key
]]
357 return added_cfgs
, deleted_cfgs
, updated_cfgs
360 class ProjectConfigCallbacks(object):
362 on_add_apply
=None, on_add_prepare
=None,
363 on_delete_apply
=None, on_delete_prepare
=None):
366 def prepare_noop(*args
, **kwargs
):
369 def apply_noop(*args
, **kwargs
):
372 self
.on_add_apply
= on_add_apply
373 self
.on_add_prepare
= on_add_prepare
374 self
.on_delete_apply
= on_delete_apply
375 self
.on_delete_prepare
= on_delete_prepare
377 for f
in ('on_add_apply', 'on_delete_apply'):
378 ref
= getattr(self
, f
)
380 setattr(self
, f
, apply_noop
)
383 if asyncio
.iscoroutinefunction(ref
):
384 raise ValueError('%s cannot be a coroutine' % (f
,))
386 for f
in ('on_add_prepare', 'on_delete_prepare'):
387 ref
= getattr(self
, f
)
389 setattr(self
, f
, prepare_noop
)
392 if not asyncio
.iscoroutinefunction(ref
):
393 raise ValueError("%s must be a coroutine" % f
)
396 class ProjectDtsHandler(object):
397 XPATH
= "C,{}/project-config".format(XPATH
)
399 def __init__(self
, dts
, log
, callbacks
, sub_config
=True):
402 self
._callbacks
= callbacks
405 self
.xpath
= ProjectDtsHandler
.XPATH
406 self
._key
= 'name_ref'
408 self
.xpath
= "C,{}".format(XPATH
)
422 def add_project(self
, name
):
423 self
._log
.info("Adding project: {}".format(name
))
425 if name
not in self
.projects
:
426 self
._callbacks
.on_add_apply(name
)
427 self
.projects
.append(name
)
429 self
._log
.error("Project already present: {}".
432 def delete_project(self
, name
):
433 self
._log
.info("Deleting project: {}".format(name
))
434 if name
in self
.projects
:
435 self
._callbacks
.on_delete_apply(name
)
436 self
.projects
.remove(name
)
438 self
._log
.error("Unrecognized project: {}".
441 def update_project(self
, name
):
442 """ Update an existing project
444 Currently, we do not take any action on MANO for this,
445 so no callbacks are defined
448 msg - The project config message
450 self
._log
.info("Updating project: {}".format(name
))
451 if name
in self
.projects
:
454 self
.add_project(name
)
457 def on_init(acg
, xact
, scratch
):
458 self
._log
.debug("on_init")
459 scratch
["projects"] = {
466 def readd_projects(xact
):
467 self
._log
.info("Re-add projects")
469 for cfg
, ks
in self
._reg
.get_xact_elements(xact
, include_keyspec
=True):
470 xpath
= ks
.to_xpath(RwProjectManoYang
.get_schema())
471 self
._log
.debug("Got ks {} for cfg {}".format(xpath
, cfg
.as_dict()))
472 name
= ManoProject
.from_xpath(xpath
, self
._log
)
473 self
._log
.debug("Project to add: {}".format(name
))
474 self
.add_project(name
)
477 def apply_config(dts
, acg
, xact
, action
, scratch
):
478 self
._log
.debug("Got project apply config (xact: %s) (action: %s): %s",
479 xact
, action
, scratch
)
481 if xact
.xact
is None:
482 if action
== rwdts
.AppconfAction
.INSTALL
:
485 self
._log
.debug("No xact handle. Skipping apply config")
490 add_cfgs
= scratch
["projects"]["added"]
495 del_cfgs
= scratch
["projects"]["deleted"]
500 update_cfgs
= scratch
["projects"]["updated"]
506 for name
in del_cfgs
:
507 self
.delete_project(name
)
510 for name
, msg
in add_cfgs
:
511 self
.add_project(name
)
514 for name
, msg
in update_cfgs
:
515 self
.update_project(name
)
518 del scratch
["projects"]
522 return RwTypes
.RwStatus
.SUCCESS
525 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
526 """ Prepare callback from DTS for Project """
528 action
= xact_info
.query_action
529 xpath
= ks_path
.to_xpath(RwProjectManoYang
.get_schema())
530 self
._log
.debug("Project xpath: {}".format(xpath
))
531 name
= ManoProject
.from_xpath(xpath
, self
._log
)
533 self
._log
.debug("Project %s on_prepare config received (action: %s): %s",
534 name
, xact_info
.query_action
, msg
)
536 if action
== rwdts
.QueryAction
.CREATE
:
537 if name
in self
.projects
:
538 self
._log
.debug("Project {} already exists. Ignore request".
541 yield from self
._callbacks
.on_add_prepare(name
)
542 scratch
["projects"]["added"].append((name
, msg
))
544 elif action
== rwdts
.QueryAction
.UPDATE
:
545 if name
in self
.projects
:
546 scratch
["projects"]["updated"].append((name
, msg
))
548 self
._log
.debug("Project {}: Invoking on_prepare add request".
550 yield from self
._callbacks
.on_add_prepare(name
)
551 scratch
["projects"]["added"].append((name
, msg
))
554 elif action
== rwdts
.QueryAction
.DELETE
:
555 # Check if the entire project got deleted
556 fref
= ProtobufC
.FieldReference
.alloc()
557 fref
.goto_whole_message(msg
.to_pbcm())
558 if fref
.is_field_deleted():
559 if name
in self
.projects
:
560 rc
, delete_msg
= yield from self
._callbacks
.on_delete_prepare(name
)
562 self
._log
.error("Project {} should not be deleted. Reason : {}".
563 format(name
, delete_msg
))
565 xact_info
.send_error_xpath(RwTypes
.RwStatus
.FAILURE
,
566 ProjectDtsHandler
.XPATH
,
569 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
572 scratch
["projects"]["deleted"].append(name
)
574 self
._log
.warning("Delete on unknown project: {}".
577 self
._log
.error("Action (%s) NOT SUPPORTED", action
)
578 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
580 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
582 self
._log
.debug("Registering for project config using xpath: %s",
583 ProjectDtsHandler
.XPATH
,
586 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
587 on_apply
=apply_config
,
590 with self
._dts
.appconf_group_create(acg_handler
) as acg
:
591 self
._reg
= acg
.register(
592 xpath
=ProjectDtsHandler
.XPATH
,
593 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
594 on_prepare
=on_prepare
,
598 class ProjectHandler(object):
599 def __init__(self
, tasklet
, project_class
, **kw
):
600 self
._tasklet
= tasklet
601 self
._log
= tasklet
.log
602 self
._log
_hdl
= tasklet
.log_hdl
603 self
._dts
= tasklet
.dts
604 self
._loop
= tasklet
.loop
605 self
._class
= project_class
608 self
._log
.debug("Creating project config handler")
609 self
.project_cfg_handler
= ProjectDtsHandler(
610 self
._dts
, self
._log
,
611 ProjectConfigCallbacks(
612 on_add_apply
=self
.on_project_added
,
613 on_add_prepare
=self
.on_add_prepare
,
614 on_delete_apply
=self
.on_project_deleted
,
615 on_delete_prepare
=self
.on_delete_prepare
,
619 def _get_tasklet_name(self
):
620 return self
._tasklet
.tasklet_info
.instance_name
622 def _get_project(self
, name
):
624 proj
= self
._tasklet
.projects
[name
]
625 except Exception as e
:
626 self
._log
.exception("Project {} ({})not found for tasklet {}: {}".
627 format(name
, list(self
._tasklet
.projects
.keys()),
628 self
._get
_tasklet
_name
(), e
))
633 def on_project_deleted(self
, name
):
634 self
._log
.debug("Project {} deleted".format(name
))
636 self
._get
_project
(name
).deregister()
637 except Exception as e
:
638 self
._log
.exception("Project {} deregister for {} failed: {}".
639 format(name
, self
._get
_tasklet
_name
(), e
))
642 proj
= self
._tasklet
.projects
.pop(name
)
644 except Exception as e
:
645 self
._log
.exception("Project {} delete for {} failed: {}".
646 format(name
, self
._get
_tasklet
_name
(), e
))
648 def on_project_added(self
, name
):
649 if name
not in self
._tasklet
.projects
:
651 self
._tasklet
.projects
[name
] = \
652 self
._class
(name
, self
._tasklet
, **(self
._kw
))
653 task
= asyncio
.ensure_future(self
._get
_project
(name
).register(),
656 self
._log
.debug("Project {} register: {}".format(name
, str(task
)))
658 except Exception as e
:
659 self
._log
.exception("Project {} create for {} failed: {}".
660 format(name
, self
._get
_tasklet
_name
(), e
))
663 self
._log
.debug("Project {} added to tasklet {}".
664 format(name
, self
._get
_tasklet
_name
()))
665 self
._get
_project
(name
)._apply
= True
668 def on_add_prepare(self
, name
):
669 self
._log
.debug("Project {} to be added to {}".
670 format(name
, self
._get
_tasklet
_name
()))
671 if name
in self
._tasklet
.projects
:
672 self
._log
.error("Project {} already exists for {}".
673 format(name
, self
._get
_tasklet
_name
()))
677 self
._tasklet
.projects
[name
] = \
678 self
._class
(name
, self
._tasklet
, **(self
._kw
))
679 yield from self
._get
_project
(name
).register()
681 except Exception as e
:
682 self
._log
.exception("Project {} create for {} failed: {}".
683 format(name
, self
._get
_tasklet
_name
(), e
))
687 def on_delete_prepare(self
, name
):
688 self
._log
.debug("Project {} being deleted for tasklet {}".
689 format(name
, self
._get
_tasklet
_name
()))
690 rc
, delete_msg
= yield from self
._get
_project
(name
).delete_prepare()
691 return rc
, delete_msg
694 self
.project_cfg_handler
.register()