update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / common / python / rift / mano / dts / subscriber / test / utest_subscriber_dts.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import gi
20 import sys
21 import types
22 import unittest
23 import uuid
24
25 import rift.test.dts
26 import rift.mano.dts as store
27
28 gi.require_version('RwDtsYang', '1.0')
29 from gi.repository import (
30 RwLaunchpadYang as launchpadyang,
31 RwDts as rwdts,
32 RwProjectVnfdYang as RwVnfdYang,
33 RwVnfrYang,
34 RwNsrYang,
35 RwProjectNsdYang as RwNsdYang,
36 VnfrYang
37 )
38 gi.require_version('RwKeyspec', '1.0')
39 from gi.repository.RwKeyspec import quoted_key
40
41
42 class DescriptorPublisher(object):
43 def __init__(self, log, dts, loop):
44 self.log = log
45 self.loop = loop
46 self.dts = dts
47
48 self._registrations = []
49
50 @asyncio.coroutine
51 def publish(self, w_path, path, desc):
52 ready_event = asyncio.Event(loop=self.loop)
53
54 @asyncio.coroutine
55 def on_ready(regh, status):
56 self.log.debug("Create element: %s, obj-type:%s obj:%s",
57 path, type(desc), desc)
58 with self.dts.transaction() as xact:
59 regh.create_element(path, desc, xact.xact)
60 self.log.debug("Created element: %s, obj:%s", path, desc)
61 ready_event.set()
62
63 handler = rift.tasklets.DTS.RegistrationHandler(
64 on_ready=on_ready
65 )
66
67 self.log.debug("Registering path: %s, obj:%s", w_path, desc)
68 reg = yield from self.dts.register(
69 w_path,
70 handler,
71 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ
72 )
73 self._registrations.append(reg)
74 self.log.debug("Registered path : %s", w_path)
75 yield from ready_event.wait()
76
77 return reg
78
79 def unpublish_all(self):
80 self.log.debug("Deregistering all published descriptors")
81 for reg in self._registrations:
82 reg.deregister()
83
84 class SubscriberStoreDtsTestCase(rift.test.dts.AbstractDTSTest):
85 @classmethod
86 def configure_schema(cls):
87 return launchpadyang.get_schema()
88
89 @classmethod
90 def configure_timeout(cls):
91 return 240
92
93 def configure_test(self, loop, test_id):
94 self.log.debug("STARTING - %s", test_id)
95 self.tinfo = self.new_tinfo(str(test_id))
96 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
97
98 self.tinfo_sub = self.new_tinfo(str(test_id) + "_sub")
99 self.dts_sub = rift.tasklets.DTS(self.tinfo_sub, self.schema, self.loop)
100
101 self.store = store.SubscriberStore(self.log, self.dts, self.loop)
102 self.publisher = DescriptorPublisher(self.log, self.dts, self.loop)
103
104 def tearDown(self):
105 super().tearDown()
106
107 @rift.test.dts.async_test
108 def test_vnfd_handler(self):
109 yield from self.store.register()
110
111 mock_vnfd = RwVnfdYang.YangData_RwProject_Project_VnfdCatalog_Vnfd()
112 mock_vnfd.id = str(uuid.uuid1())
113
114 w_xpath = "C,/rw-project:project/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
115 xpath = "{}[project-vnfd:id={}]".format(w_xpath, quoted_key(mock_vnfd.id))
116 yield from self.publisher.publish(w_xpath, xpath, mock_vnfd)
117
118 yield from asyncio.sleep(5, loop=self.loop)
119 assert len(self.store.vnfd) == 1
120 assert self.store.get_vnfd(self.store.vnfd[0].id) is not None
121
122 yield from self.dts.query_update(xpath, rwdts.XactFlag.ADVISE, mock_vnfd)
123 assert len(self.store.vnfd) == 1
124
125 yield from self.dts.query_delete(xpath, flags=rwdts.XactFlag.ADVISE)
126 assert len(self.store.vnfd) == 0
127
128 @rift.test.dts.async_test
129 def test_vnfr_handler(self):
130 yield from self.store.register()
131
132 mock_vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr()
133 mock_vnfr.id = str(uuid.uuid1())
134
135 w_xpath = "D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr"
136 xpath = "{}[vnfr:id={}]".format(w_xpath, quoted_key(mock_vnfr.id))
137 yield from self.publisher.publish(w_xpath, xpath, mock_vnfr)
138
139 yield from asyncio.sleep(5, loop=self.loop)
140 assert len(self.store.vnfr) == 1
141 assert self.store.get_vnfr(self.store.vnfr[0].id) is not None
142
143 yield from self.dts.query_update(xpath, rwdts.XactFlag.ADVISE, mock_vnfr)
144 yield from asyncio.sleep(5, loop=self.loop)
145 assert len(self.store.vnfr) == 1
146
147 yield from self.dts.query_delete(xpath, flags=rwdts.XactFlag.ADVISE)
148 yield from asyncio.sleep(5, loop=self.loop)
149 assert len(self.store.vnfr) == 0
150
151 @rift.test.dts.async_test
152 def test_nsr_handler(self):
153 yield from self.store.register()
154
155 mock_nsr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr()
156 mock_nsr.ns_instance_config_ref = str(uuid.uuid1())
157 mock_nsr.name_ref = "Foo"
158
159 w_xpath = "D,/rw-project:project/nsr:ns-instance-opdata/nsr:nsr"
160 xpath = "{}[nsr:ns-instance-config-ref={}]".format(w_xpath, quoted_key(mock_nsr.ns_instance_config_ref))
161 yield from self.publisher.publish(w_xpath, xpath, mock_nsr)
162
163 yield from asyncio.sleep(5, loop=self.loop)
164 assert len(self.store.nsr) == 1
165 assert self.store.get_nsr(self.store.nsr[0].ns_instance_config_ref) is not None
166
167 yield from self.dts.query_update(xpath, rwdts.XactFlag.ADVISE, mock_nsr)
168 yield from asyncio.sleep(5, loop=self.loop)
169 assert len(self.store.nsr) == 1
170
171 yield from self.dts.query_delete(xpath, flags=rwdts.XactFlag.ADVISE)
172 yield from asyncio.sleep(5, loop=self.loop)
173 assert len(self.store.nsr) == 0
174
175 @rift.test.dts.async_test
176 def test_nsd_handler(self):
177 yield from self.store.register()
178
179 mock_nsd = RwNsdYang.YangData_RwProject_Project_NsdCatalog_Nsd()
180 mock_nsd.id = str(uuid.uuid1())
181
182 w_xpath = "C,/rw-project:project/project-nsd:nsd-catalog/project-nsd:nsd"
183 xpath = "{}[project-nsd:id={}]".format(w_xpath, quoted_key(mock_nsd.id))
184 yield from self.publisher.publish(w_xpath, xpath, mock_nsd)
185
186 yield from asyncio.sleep(2, loop=self.loop)
187 assert len(self.store.nsd) == 1
188 assert self.store.get_nsd(self.store.nsd[0].id) is not None
189
190 yield from self.dts.query_update(xpath, rwdts.XactFlag.ADVISE, mock_nsd)
191 yield from asyncio.sleep(5, loop=self.loop)
192 assert len(self.store.nsd) == 1
193
194 yield from self.dts.query_delete(xpath, flags=rwdts.XactFlag.ADVISE)
195 yield from asyncio.sleep(5, loop=self.loop)
196 assert len(self.store.nsd) == 0
197
198 @rift.test.dts.async_test
199 def test_vnfr_crash(self):
200 vnf_handler = store.VnfrCatalogSubscriber(self.log, self.dts, self.loop)
201 def get_reg_flags(self):
202 from gi.repository import RwDts as rwdts
203 return rwdts.Flag.SUBSCRIBER|rwdts.Flag.DELTA_READY|rwdts.Flag.CACHE
204
205 vnf_handler.get_reg_flags = types.MethodType(get_reg_flags, vnf_handler)
206
207 # publish
208 yield from vnf_handler.register()
209
210 mock_vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr()
211 mock_vnfr.id = str(uuid.uuid1())
212
213 def mon_xpath(param_id=None):
214 """ Monitoring params xpath """
215 return("D,/rw-project:project/vnfr:vnfr-catalog" +
216 "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(mock_vnfr.id)) +
217 "/vnfr:monitoring-param" +
218 ("[vnfr:id={}]".format(quoted_key(param_id)) if param_id else ""))
219
220
221 w_xpath = "D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr"
222 xpath = "{}[vnfr:id={}]".format(w_xpath, quoted_key(mock_vnfr.id))
223 yield from self.publisher.publish(w_xpath, xpath, mock_vnfr)
224
225 mock_param = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam.from_dict({
226 "id": "1"
227 })
228 mock_vnfr.monitoring_param.append(mock_param)
229 yield from self.publisher.publish(w_xpath, xpath, mock_vnfr)
230
231 def main(argv=sys.argv[1:]):
232
233 # The unittest framework requires a program name, so use the name of this
234 # file instead (we do not want to have to pass a fake program name to main
235 # when this is called from the interpreter).
236 unittest.main(
237 argv=[__file__] + argv,
238 testRunner=None#xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
239 )
240
241 if __name__ == '__main__':
242 main()