update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / common / python / rift / mano / ro_account / config.py
1
2 #
3 # Copyright 2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19
20 import gi
21 gi.require_version('RwDts', '1.0')
22 import rift.tasklets
23 from rift.mano.utils.project import get_add_delete_update_cfgs
24
25 from gi.repository import (
26 RwDts as rwdts,
27 ProtobufC,
28 RwRoAccountYang,
29 )
30
31 from . import accounts
32
33 class ROAccountConfigCallbacks(object):
34 def __init__(self,
35 on_add_apply=None, on_delete_apply=None):
36
37 @asyncio.coroutine
38 def prepare_noop(*args, **kwargs):
39 pass
40
41 def apply_noop(*args, **kwargs):
42 pass
43
44 self.on_add_apply = on_add_apply
45 self.on_delete_apply = on_delete_apply
46
47 for f in ('on_add_apply', 'on_delete_apply'):
48 ref = getattr(self, f)
49 if ref is None:
50 setattr(self, f, apply_noop)
51 continue
52
53 if asyncio.iscoroutinefunction(ref):
54 raise ValueError('%s cannot be a coroutine' % (f,))
55
56 class ROAccountConfigSubscriber(object):
57 XPATH = "C,/rw-ro-account:ro-account/rw-ro-account:account"
58
59 def __init__(self, dts, log, loop, project, records_publisher, ro_callbacks):
60 self._dts = dts
61 self._log = log
62 self._loop = loop
63 self._project = project
64 self._records_publisher = records_publisher
65 self._ro_callbacks = ro_callbacks
66
67 self._reg = None
68 self.accounts = {}
69 self._log.debug("Inside RO Account Config Subscriber init")
70
71 def add_account(self, account_msg):
72 self._log.debug("adding ro account: {}".format(account_msg))
73
74 account = accounts.ROAccount(self._dts,
75 self._log,
76 self._loop,
77 self._project,
78 self._records_publisher,
79 account_msg)
80 self.accounts[account.name] = account
81 self._ro_callbacks.on_add_apply(account)
82
83 def delete_account(self, account_name):
84 self._log.debug("Deleting RO account: {}".format(account_name))
85 account = self.accounts[account_name]
86 del self.accounts[account_name]
87 self._ro_callbacks.on_delete_apply(account_name)
88
89 def deregister(self):
90 self._log.debug("Project {}: De-register ro account handler".
91 format(self._project))
92 if self._reg:
93 self._reg.deregister()
94 self._reg = None
95
96 def update_account(self, account):
97 """ Update an existing ro account
98
99 In order to simplify update, turn an update into a delete followed by
100 an add. The drawback to this approach is that we will not support
101 updates of an "in-use" ro account, but this seems like a
102 reasonable trade-off.
103
104 """
105 self._log.debug("updating ro account: {}".format(account))
106
107 self.delete_account(account.name)
108 self.add_account(account)
109
110 @asyncio.coroutine
111 def register(self):
112 @asyncio.coroutine
113 def apply_config(dts, acg, xact, action, scratch):
114 self._log.debug("Got ro account apply config (xact: %s) (action: %s)", xact, action)
115
116 if xact.xact is None:
117 if action == rwdts.AppconfAction.INSTALL:
118 curr_cfg = self._reg.elements
119 for cfg in curr_cfg:
120 self._log.debug("RO account being re-added after restart.")
121 self.add_account(cfg)
122 else:
123 self._log.debug("No xact handle. Skipping apply config")
124
125 return
126
127 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
128 dts_member_reg=self._reg,
129 xact=xact,
130 key_name="name",
131 )
132
133 # Handle Deletes
134 for cfg in delete_cfgs:
135 self.delete_account(cfg.name)
136
137 # Handle Adds
138 for cfg in add_cfgs:
139 self.add_account(cfg)
140
141 # Handle Updates
142 for cfg in update_cfgs:
143 self.update_account(cfg)
144
145 @asyncio.coroutine
146 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
147 """ Prepare callback from DTS for RO Account """
148
149 self._log.debug("RO account on_prepare config received (action: %s): %s",
150 xact_info.query_action, msg)
151 try:
152 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
153 except rift.tasklets.dts.ResponseError as e:
154 self._log.error(
155 "Subscriber DTS prepare for project {}, action {} in class {} failed: {}".
156 format(self._project, xact_info.query_action, self.__class__, e))
157
158 self._log.debug("Registering for RO Account config using xpath: %s",
159 ROAccountConfigSubscriber.XPATH,
160 )
161 acg_handler = rift.tasklets.AppConfGroup.Handler(
162 on_apply=apply_config,
163 )
164
165 xpath = self._project.add_project(ROAccountConfigSubscriber.XPATH)
166 with self._dts.appconf_group_create(acg_handler) as acg:
167 self._reg = acg.register(
168 xpath=xpath,
169 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
170 on_prepare=on_prepare,
171 )