X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=common%2Fpython%2Frift%2Fmano%2Fdts%2Fsubscriber%2Fcore.py;h=53c2e8cc14aeffab371e26f513a37214f993a7fa;hb=HEAD;hp=dd2513e81af9166db16f0275c5a2bec8c3ea2d2f;hpb=49868d2c71eb364cee9707515be6841a568dad40;p=osm%2FSO.git diff --git a/common/python/rift/mano/dts/subscriber/core.py b/common/python/rift/mano/dts/subscriber/core.py index dd2513e8..53c2e8cc 100644 --- a/common/python/rift/mano/dts/subscriber/core.py +++ b/common/python/rift/mano/dts/subscriber/core.py @@ -1,6 +1,6 @@ """ -# -# Copyright 2016 RIFT.IO Inc +# +# Copyright 2016-2017 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -27,6 +27,9 @@ import asyncio from gi.repository import (RwDts as rwdts, ProtobufC) import rift.tasklets +from rift.mano.utils.project import ( + get_add_delete_update_cfgs, + ) from ..core import DtsHandler @@ -35,11 +38,11 @@ class SubscriberDtsHandler(DtsHandler): """A common class for all subscribers. """ @classmethod - def from_tasklet(cls, tasklet, callback=None): + def from_project(cls, proj, callback=None): """Convenience method to build the object from tasklet Args: - tasklet (rift.tasklets.Tasklet): Tasklet + proj (rift.mano.utils.project.ManoProject): Project callback (None, optional): Callable, which will be invoked on subscriber changes. @@ -48,20 +51,41 @@ class SubscriberDtsHandler(DtsHandler): msg: The Gi Object msg from DTS action(rwdts.QueryAction): Action type """ - return cls(tasklet.log, tasklet.dts, tasklet.loop, callback=callback) + return cls(proj.log, proj.dts, proj.loop, proj, callback=callback) - def __init__(self, log, dts, loop, callback=None): - super().__init__(log, dts, loop) + def __init__(self, log, dts, loop, project, callback=None): + super().__init__(log, dts, loop, project) self.callback = callback + @abc.abstractmethod + def get_xpath(self): + """ + Returns: + str: xpath + """ + pass + def get_reg_flags(self): """Default set of REG flags, can be over-ridden by sub classes. - + Returns: Set of rwdts.Flag types. """ return rwdts.Flag.SUBSCRIBER|rwdts.Flag.DELTA_READY|rwdts.Flag.CACHE + @asyncio.coroutine + def data(self): + itr = yield from self.dts.query_read( + self.get_xpath()) + + values = [] + for res in itr: + result = yield from res + result = result.result + values.append(result) + + return values + class AbstractOpdataSubscriber(SubscriberDtsHandler): @@ -70,29 +94,32 @@ class AbstractOpdataSubscriber(SubscriberDtsHandler): Opdata subscriber can be created in one step by subclassing and implementing the MANDATORY get_xpath() method - + """ - @abc.abstractmethod - def get_xpath(self): - """ - Returns: - str: xpath - """ - pass @asyncio.coroutine def register(self): """Triggers the registration """ + + if self._reg: + self._log.warning("RPC already registered for project {}". + format(self._project.name)) + return + xacts = {} def on_commit(xact_info): - xact_id = xact_info.handle.get_xact().id - if xact_id in xacts: - msg, action = xacts.pop(xact_id) + try: + xact_id = xact_info.handle.get_xact().id + if xact_id in xacts: + msg, action = xacts.pop(xact_id) - if self.callback: - self.callback(msg, action) + if self.callback: + self.callback(msg, action) + except Exception as e: + self.log.error("Exception when committing data for registration:{} exception:{}".format(self.get_xpath(), e)) + self.log.exception(e) return rwdts.MemberRspCode.ACTION_OK @@ -105,8 +132,11 @@ class AbstractOpdataSubscriber(SubscriberDtsHandler): except Exception as e: self.log.exception(e) - finally: + try: xact_info.respond_xpath(rwdts.XactRspCode.ACK) + except rift.tasklets.dts.ResponseError as e: + self._log.warning("Reg handle is None during action {} for {}: {}". + format(action, self.__class__, e)) reg_event = asyncio.Event(loop=self.loop) @@ -120,17 +150,14 @@ class AbstractOpdataSubscriber(SubscriberDtsHandler): on_commit=on_commit ) - self.reg = yield from self.dts.register( - xpath=self.get_xpath(), + self._reg = yield from self.dts.register( + xpath=self.project.add_project(self.get_xpath()), flags=self.get_reg_flags(), handler=handler) # yield from reg_event.wait() - assert self.reg is not None - - def deregister(self): - self.reg.deregister() + assert self._reg is not None class AbstractConfigSubscriber(SubscriberDtsHandler): @@ -139,7 +166,7 @@ class AbstractConfigSubscriber(SubscriberDtsHandler): Config subscriber can be created in one step by subclassing and implementing the MANDATORY get_xpath() method - + """ KEY = "msgs" @@ -151,42 +178,31 @@ class AbstractConfigSubscriber(SubscriberDtsHandler): def key_name(self): pass - def get_add_delete_update_cfgs(self, dts_member_reg, xact, key_name): - # Unforunately, it is currently difficult to figure out what has exactly - # changed in this xact without Pbdelta support (RIFT-4916) - # As a workaround, we can fetch the pre and post xact elements and - # perform a comparison to figure out adds/deletes/updates - xact_cfgs = list(dts_member_reg.get_xact_elements(xact)) - curr_cfgs = list(dts_member_reg.elements) - - xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs} - curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs} - - # Find Adds - added_keys = set(xact_key_map) - set(curr_key_map) - added_cfgs = [xact_key_map[key] for key in added_keys] - - # Find Deletes - deleted_keys = set(curr_key_map) - set(xact_key_map) - deleted_cfgs = [curr_key_map[key] for key in deleted_keys] - - # Find Updates - updated_keys = set(curr_key_map) & set(xact_key_map) - updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]] - - return added_cfgs, deleted_cfgs, updated_cfgs - @asyncio.coroutine def register(self): """ Register for VNFD configuration""" def on_apply(dts, acg, xact, action, scratch): """Apply the configuration""" - is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL - - - add_cfgs, delete_cfgs, update_cfgs = self.get_add_delete_update_cfgs( - dts_member_reg=self.reg, + if xact.xact is None: + if action == rwdts.AppconfAction.INSTALL: + try: + if self._reg: + for cfg in self._reg.elements: + if self.callback: + self.callback(cfg, rwdts.QueryAction.CREATE) + + else: + self._log.error("Reg handle is None during action {} for {}". + format(action, self.__class__)) + + except Exception as e: + self._log.exception("Adding config {} during restart failed: {}". + format(cfg, e)) + return + + add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs( + dts_member_reg=self._reg, xact=xact, key_name=self.key_name()) @@ -202,14 +218,18 @@ class AbstractConfigSubscriber(SubscriberDtsHandler): @asyncio.coroutine def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch): """ on prepare callback """ - xact_info.respond_xpath(rwdts.XactRspCode.ACK) + self._log.debug("Subscriber DTS prepare for project %s: %s", + self.project, xact_info.query_action) + try: + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + except rift.tasklets.dts.ResponseError as e: + self._log.warning( + "Subscriber DTS prepare for project {}, action {} in class {} failed: {}". + format(self.project, xact_info.query_action, self.__class__, e)) acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply) with self.dts.appconf_group_create(handler=acg_hdl) as acg: - self.reg = acg.register( - xpath=self.get_xpath(), + self._reg = acg.register( + xpath=self.project.add_project(self.get_xpath()), flags=self.get_reg_flags(), on_prepare=on_prepare) - - def deregister(self): - self.reg.deregister()