3 # Copyright 2017 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 gi
.require_version('RwDts', '1.0')
23 from rift
.mano
.utils
.project
import get_add_delete_update_cfgs
25 from gi
.repository
import (
31 from . import accounts
33 class ROAccountConfigCallbacks(object):
35 on_add_apply
=None, on_delete_apply
=None):
38 def prepare_noop(*args
, **kwargs
):
41 def apply_noop(*args
, **kwargs
):
44 self
.on_add_apply
= on_add_apply
45 self
.on_delete_apply
= on_delete_apply
47 for f
in ('on_add_apply', 'on_delete_apply'):
48 ref
= getattr(self
, f
)
50 setattr(self
, f
, apply_noop
)
53 if asyncio
.iscoroutinefunction(ref
):
54 raise ValueError('%s cannot be a coroutine' % (f
,))
56 class ROAccountConfigSubscriber(object):
57 XPATH
= "C,/rw-ro-account:ro-account/rw-ro-account:account"
59 def __init__(self
, dts
, log
, loop
, project
, records_publisher
, ro_callbacks
):
63 self
._project
= project
64 self
._records
_publisher
= records_publisher
65 self
._ro
_callbacks
= ro_callbacks
69 self
._log
.debug("Inside RO Account Config Subscriber init")
71 def add_account(self
, account_msg
):
72 self
._log
.debug("adding ro account: {}".format(account_msg
))
74 account
= accounts
.ROAccount(self
._dts
,
78 self
._records
_publisher
,
80 self
.accounts
[account
.name
] = account
81 self
._ro
_callbacks
.on_add_apply(account
)
83 def delete_account(self
, account_name
):
84 self
._log
.debug("Deleting RO account: {}".format(account_name
))
85 account
= self
.accounts
[account_name
]
86 del self
.accounts
[account_name
]
87 self
._ro
_callbacks
.on_delete_apply(account_name
)
90 self
._log
.debug("Project {}: De-register ro account handler".
91 format(self
._project
))
93 self
._reg
.deregister()
96 def update_account(self
, account
):
97 """ Update an existing ro account
99 In order to simplify update, turn an update into a delete followed by
100 an add. The drawback to this approach is that we will not support
101 updates of an "in-use" ro account, but this seems like a
102 reasonable trade-off.
105 self
._log
.debug("updating ro account: {}".format(account
))
107 self
.delete_account(account
.name
)
108 self
.add_account(account
)
113 def apply_config(dts
, acg
, xact
, action
, scratch
):
114 self
._log
.debug("Got ro account apply config (xact: %s) (action: %s)", xact
, action
)
116 if xact
.xact
is None:
117 if action
== rwdts
.AppconfAction
.INSTALL
:
118 curr_cfg
= self
._reg
.elements
120 self
._log
.debug("RO account being re-added after restart.")
121 self
.add_account(cfg
)
123 self
._log
.debug("No xact handle. Skipping apply config")
127 add_cfgs
, delete_cfgs
, update_cfgs
= get_add_delete_update_cfgs(
128 dts_member_reg
=self
._reg
,
134 for cfg
in delete_cfgs
:
135 self
.delete_account(cfg
.name
)
139 self
.add_account(cfg
)
142 for cfg
in update_cfgs
:
143 self
.update_account(cfg
)
146 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
147 """ Prepare callback from DTS for RO Account """
149 self
._log
.debug("RO account on_prepare config received (action: %s): %s",
150 xact_info
.query_action
, msg
)
152 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
153 except rift
.tasklets
.dts
.ResponseError
as e
:
155 "Subscriber DTS prepare for project {}, action {} in class {} failed: {}".
156 format(self
._project
, xact_info
.query_action
, self
.__class
__, e
))
158 self
._log
.debug("Registering for RO Account config using xpath: %s",
159 ROAccountConfigSubscriber
.XPATH
,
161 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
162 on_apply
=apply_config
,
165 xpath
= self
._project
.add_project(ROAccountConfigSubscriber
.XPATH
)
166 with self
._dts
.appconf_group_create(acg_handler
) as acg
:
167 self
._reg
= acg
.register(
169 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
170 on_prepare
=on_prepare
,