3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
25 #Setting RIFT_VAR_ROOT if not already set for unit test execution
26 if "RIFT_VAR_ROOT" not in os
.environ
:
27 os
.environ
['RIFT_VAR_ROOT'] = os
.path
.join(os
.environ
['RIFT_INSTALL'], 'var/rift/unittest')
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwPkgMgmtYang', '1.0')
31 from gi
.repository
import (
35 import rift
.tasklets
.rwpkgmgr
.subscriber
as pkg_subscriber
37 from rift
.mano
.utils
.project
import ManoProject
, DEFAULT_PROJECT
39 gi
.require_version('RwKeyspec', '1.0')
40 from gi
.repository
.RwKeyspec
import quoted_key
43 class DescriptorPublisher(object):
44 # TODO: Need to be moved to a central page, too many copy pastes
45 def __init__(self
, log
, dts
, loop
):
50 self
._registrations
= []
53 def publish(self
, w_path
, path
, desc
):
54 ready_event
= asyncio
.Event(loop
=self
.loop
)
57 def on_ready(regh
, status
):
58 self
.log
.debug("Create element: %s, obj-type:%s obj:%s",
59 path
, type(desc
), desc
)
60 with self
.dts
.transaction() as xact
:
61 regh
.create_element(path
, desc
, xact
.xact
)
62 self
.log
.debug("Created element: %s, obj:%s", path
, desc
)
65 handler
= rift
.tasklets
.DTS
.RegistrationHandler(
69 self
.log
.debug("Registering path: %s, obj:%s", w_path
, desc
)
70 reg
= yield from self
.dts
.register(
73 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ
75 self
._registrations
.append(reg
)
76 self
.log
.debug("Registered path : %s", w_path
)
77 yield from ready_event
.wait()
81 def unpublish_all(self
):
82 self
.log
.debug("Deregistering all published descriptors")
83 for reg
in self
._registrations
:
86 class SubscriberStoreDtsTestCase(rift
.test
.dts
.AbstractDTSTest
):
88 def configure_schema(cls
):
89 return RwPkgMgmtYang
.get_schema()
92 def configure_timeout(cls
):
95 def configure_test(self
, loop
, test_id
):
96 self
.log
.debug("STARTING - %s", test_id
)
97 self
.tinfo
= self
.new_tinfo(str(test_id
))
98 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
99 self
.publisher
= DescriptorPublisher(self
.log
, self
.dts
, self
.loop
)
100 self
.project
= ManoProject(self
.log
, name
=DEFAULT_PROJECT
)
105 @rift.test
.dts
.async_test
106 def test_download_status_handler(self
):
108 mock_msg
= RwPkgMgmtYang
.YangData_RwProject_Project_DownloadJobs_Job
.from_dict({
109 "url": "http://foo/bar",
111 "download_id": str(uuid
.uuid4())})
113 w_xpath
= self
.project
.add_project("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job")
114 xpath
= "{}[download-id={}]".format(w_xpath
, quoted_key(mock_msg
.download_id
))
117 def mock_cb(msg
, status
):
119 assert msg
== mock_msg
122 sub
= pkg_subscriber
.DownloadStatusSubscriber(
129 yield from sub
.register()
130 yield from asyncio
.sleep(1, loop
=self
.loop
)
132 yield from self
.publisher
.publish(w_xpath
, xpath
, mock_msg
)
133 yield from asyncio
.sleep(1, loop
=self
.loop
)
135 assert mock_called
is True
138 def main(argv
=sys
.argv
[1:]):
140 # The unittest framework requires a program name, so use the name of this
141 # file instead (we do not want to have to pass a fake program name to main
142 # when this is called from the interpreter).
144 argv
=[__file__
] + argv
,
145 testRunner
=None#xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
148 if __name__
== '__main__':