3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
26 import rift
.mano
.dts
as store
28 gi
.require_version('RwDtsYang', '1.0')
29 from gi
.repository
import (
30 RwLaunchpadYang
as launchpadyang
,
32 RwProjectVnfdYang
as RwVnfdYang
,
35 RwProjectNsdYang
as RwNsdYang
,
38 gi
.require_version('RwKeyspec', '1.0')
39 from gi
.repository
.RwKeyspec
import quoted_key
42 class DescriptorPublisher(object):
43 def __init__(self
, log
, dts
, loop
):
48 self
._registrations
= []
51 def publish(self
, w_path
, path
, desc
):
52 ready_event
= asyncio
.Event(loop
=self
.loop
)
55 def on_ready(regh
, status
):
56 self
.log
.debug("Create element: %s, obj-type:%s obj:%s",
57 path
, type(desc
), desc
)
58 with self
.dts
.transaction() as xact
:
59 regh
.create_element(path
, desc
, xact
.xact
)
60 self
.log
.debug("Created element: %s, obj:%s", path
, desc
)
63 handler
= rift
.tasklets
.DTS
.RegistrationHandler(
67 self
.log
.debug("Registering path: %s, obj:%s", w_path
, desc
)
68 reg
= yield from self
.dts
.register(
71 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ
73 self
._registrations
.append(reg
)
74 self
.log
.debug("Registered path : %s", w_path
)
75 yield from ready_event
.wait()
79 def unpublish_all(self
):
80 self
.log
.debug("Deregistering all published descriptors")
81 for reg
in self
._registrations
:
84 class SubscriberStoreDtsTestCase(rift
.test
.dts
.AbstractDTSTest
):
86 def configure_schema(cls
):
87 return launchpadyang
.get_schema()
90 def configure_timeout(cls
):
93 def configure_test(self
, loop
, test_id
):
94 self
.log
.debug("STARTING - %s", test_id
)
95 self
.tinfo
= self
.new_tinfo(str(test_id
))
96 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
98 self
.tinfo_sub
= self
.new_tinfo(str(test_id
) + "_sub")
99 self
.dts_sub
= rift
.tasklets
.DTS(self
.tinfo_sub
, self
.schema
, self
.loop
)
101 self
.store
= store
.SubscriberStore(self
.log
, self
.dts
, self
.loop
)
102 self
.publisher
= DescriptorPublisher(self
.log
, self
.dts
, self
.loop
)
107 @rift.test
.dts
.async_test
108 def test_vnfd_handler(self
):
109 yield from self
.store
.register()
111 mock_vnfd
= RwVnfdYang
.YangData_RwProject_Project_VnfdCatalog_Vnfd()
112 mock_vnfd
.id = str(uuid
.uuid1())
114 w_xpath
= "C,/rw-project:project/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
115 xpath
= "{}[project-vnfd:id={}]".format(w_xpath
, quoted_key(mock_vnfd
.id))
116 yield from self
.publisher
.publish(w_xpath
, xpath
, mock_vnfd
)
118 yield from asyncio
.sleep(5, loop
=self
.loop
)
119 assert len(self
.store
.vnfd
) == 1
120 assert self
.store
.get_vnfd(self
.store
.vnfd
[0].id) is not None
122 yield from self
.dts
.query_update(xpath
, rwdts
.XactFlag
.ADVISE
, mock_vnfd
)
123 assert len(self
.store
.vnfd
) == 1
125 yield from self
.dts
.query_delete(xpath
, flags
=rwdts
.XactFlag
.ADVISE
)
126 assert len(self
.store
.vnfd
) == 0
128 @rift.test
.dts
.async_test
129 def test_vnfr_handler(self
):
130 yield from self
.store
.register()
132 mock_vnfr
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr()
133 mock_vnfr
.id = str(uuid
.uuid1())
135 w_xpath
= "D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr"
136 xpath
= "{}[vnfr:id={}]".format(w_xpath
, quoted_key(mock_vnfr
.id))
137 yield from self
.publisher
.publish(w_xpath
, xpath
, mock_vnfr
)
139 yield from asyncio
.sleep(5, loop
=self
.loop
)
140 assert len(self
.store
.vnfr
) == 1
141 assert self
.store
.get_vnfr(self
.store
.vnfr
[0].id) is not None
143 yield from self
.dts
.query_update(xpath
, rwdts
.XactFlag
.ADVISE
, mock_vnfr
)
144 yield from asyncio
.sleep(5, loop
=self
.loop
)
145 assert len(self
.store
.vnfr
) == 1
147 yield from self
.dts
.query_delete(xpath
, flags
=rwdts
.XactFlag
.ADVISE
)
148 yield from asyncio
.sleep(5, loop
=self
.loop
)
149 assert len(self
.store
.vnfr
) == 0
151 @rift.test
.dts
.async_test
152 def test_nsr_handler(self
):
153 yield from self
.store
.register()
155 mock_nsr
= RwNsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr()
156 mock_nsr
.ns_instance_config_ref
= str(uuid
.uuid1())
157 mock_nsr
.name_ref
= "Foo"
159 w_xpath
= "D,/rw-project:project/nsr:ns-instance-opdata/nsr:nsr"
160 xpath
= "{}[nsr:ns-instance-config-ref={}]".format(w_xpath
, quoted_key(mock_nsr
.ns_instance_config_ref
))
161 yield from self
.publisher
.publish(w_xpath
, xpath
, mock_nsr
)
163 yield from asyncio
.sleep(5, loop
=self
.loop
)
164 assert len(self
.store
.nsr
) == 1
165 assert self
.store
.get_nsr(self
.store
.nsr
[0].ns_instance_config_ref
) is not None
167 yield from self
.dts
.query_update(xpath
, rwdts
.XactFlag
.ADVISE
, mock_nsr
)
168 yield from asyncio
.sleep(5, loop
=self
.loop
)
169 assert len(self
.store
.nsr
) == 1
171 yield from self
.dts
.query_delete(xpath
, flags
=rwdts
.XactFlag
.ADVISE
)
172 yield from asyncio
.sleep(5, loop
=self
.loop
)
173 assert len(self
.store
.nsr
) == 0
175 @rift.test
.dts
.async_test
176 def test_nsd_handler(self
):
177 yield from self
.store
.register()
179 mock_nsd
= RwNsdYang
.YangData_RwProject_Project_NsdCatalog_Nsd()
180 mock_nsd
.id = str(uuid
.uuid1())
182 w_xpath
= "C,/rw-project:project/project-nsd:nsd-catalog/project-nsd:nsd"
183 xpath
= "{}[project-nsd:id={}]".format(w_xpath
, quoted_key(mock_nsd
.id))
184 yield from self
.publisher
.publish(w_xpath
, xpath
, mock_nsd
)
186 yield from asyncio
.sleep(2, loop
=self
.loop
)
187 assert len(self
.store
.nsd
) == 1
188 assert self
.store
.get_nsd(self
.store
.nsd
[0].id) is not None
190 yield from self
.dts
.query_update(xpath
, rwdts
.XactFlag
.ADVISE
, mock_nsd
)
191 yield from asyncio
.sleep(5, loop
=self
.loop
)
192 assert len(self
.store
.nsd
) == 1
194 yield from self
.dts
.query_delete(xpath
, flags
=rwdts
.XactFlag
.ADVISE
)
195 yield from asyncio
.sleep(5, loop
=self
.loop
)
196 assert len(self
.store
.nsd
) == 0
198 @rift.test
.dts
.async_test
199 def test_vnfr_crash(self
):
200 vnf_handler
= store
.VnfrCatalogSubscriber(self
.log
, self
.dts
, self
.loop
)
201 def get_reg_flags(self
):
202 from gi
.repository
import RwDts
as rwdts
203 return rwdts
.Flag
.SUBSCRIBER|rwdts
.Flag
.DELTA_READY|rwdts
.Flag
.CACHE
205 vnf_handler
.get_reg_flags
= types
.MethodType(get_reg_flags
, vnf_handler
)
208 yield from vnf_handler
.register()
210 mock_vnfr
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr()
211 mock_vnfr
.id = str(uuid
.uuid1())
213 def mon_xpath(param_id
=None):
214 """ Monitoring params xpath """
215 return("D,/rw-project:project/vnfr:vnfr-catalog" +
216 "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(mock_vnfr
.id)) +
217 "/vnfr:monitoring-param" +
218 ("[vnfr:id={}]".format(quoted_key(param_id
)) if param_id
else ""))
221 w_xpath
= "D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr"
222 xpath
= "{}[vnfr:id={}]".format(w_xpath
, quoted_key(mock_vnfr
.id))
223 yield from self
.publisher
.publish(w_xpath
, xpath
, mock_vnfr
)
225 mock_param
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam
.from_dict({
228 mock_vnfr
.monitoring_param
.append(mock_param
)
229 yield from self
.publisher
.publish(w_xpath
, xpath
, mock_vnfr
)
231 def main(argv
=sys
.argv
[1:]):
233 # The unittest framework requires a program name, so use the name of this
234 # file instead (we do not want to have to pass a fake program name to main
235 # when this is called from the interpreter).
237 argv
=[__file__
] + argv
,
238 testRunner
=None#xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
241 if __name__
== '__main__':