update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / test / utest_pkgmgr_subscriber_dts.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import gi
20 import sys
21 import unittest
22 import uuid
23 import os
24
25 #Setting RIFT_VAR_ROOT if not already set for unit test execution
26 if "RIFT_VAR_ROOT" not in os.environ:
27 os.environ['RIFT_VAR_ROOT'] = os.path.join(os.environ['RIFT_INSTALL'], 'var/rift/unittest')
28
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwPkgMgmtYang', '1.0')
31 from gi.repository import (
32 RwPkgMgmtYang,
33 RwDts as rwdts,
34 )
35 import rift.tasklets.rwpkgmgr.subscriber as pkg_subscriber
36 import rift.test.dts
37 from rift.mano.utils.project import ManoProject, DEFAULT_PROJECT
38
39 gi.require_version('RwKeyspec', '1.0')
40 from gi.repository.RwKeyspec import quoted_key
41
42
43 class DescriptorPublisher(object):
44 # TODO: Need to be moved to a central page, too many copy pastes
45 def __init__(self, log, dts, loop):
46 self.log = log
47 self.loop = loop
48 self.dts = dts
49
50 self._registrations = []
51
52 @asyncio.coroutine
53 def publish(self, w_path, path, desc):
54 ready_event = asyncio.Event(loop=self.loop)
55
56 @asyncio.coroutine
57 def on_ready(regh, status):
58 self.log.debug("Create element: %s, obj-type:%s obj:%s",
59 path, type(desc), desc)
60 with self.dts.transaction() as xact:
61 regh.create_element(path, desc, xact.xact)
62 self.log.debug("Created element: %s, obj:%s", path, desc)
63 ready_event.set()
64
65 handler = rift.tasklets.DTS.RegistrationHandler(
66 on_ready=on_ready
67 )
68
69 self.log.debug("Registering path: %s, obj:%s", w_path, desc)
70 reg = yield from self.dts.register(
71 w_path,
72 handler,
73 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ
74 )
75 self._registrations.append(reg)
76 self.log.debug("Registered path : %s", w_path)
77 yield from ready_event.wait()
78
79 return reg
80
81 def unpublish_all(self):
82 self.log.debug("Deregistering all published descriptors")
83 for reg in self._registrations:
84 reg.deregister()
85
86 class SubscriberStoreDtsTestCase(rift.test.dts.AbstractDTSTest):
87 @classmethod
88 def configure_schema(cls):
89 return RwPkgMgmtYang.get_schema()
90
91 @classmethod
92 def configure_timeout(cls):
93 return 240
94
95 def configure_test(self, loop, test_id):
96 self.log.debug("STARTING - %s", test_id)
97 self.tinfo = self.new_tinfo(str(test_id))
98 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
99 self.publisher = DescriptorPublisher(self.log, self.dts, self.loop)
100 self.project = ManoProject(self.log, name=DEFAULT_PROJECT)
101
102 def tearDown(self):
103 super().tearDown()
104
105 @rift.test.dts.async_test
106 def test_download_status_handler(self):
107
108 mock_msg = RwPkgMgmtYang.YangData_RwProject_Project_DownloadJobs_Job.from_dict({
109 "url": "http://foo/bar",
110 "package_id": "123",
111 "download_id": str(uuid.uuid4())})
112
113 w_xpath = self.project.add_project("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job")
114 xpath = "{}[download-id={}]".format(w_xpath, quoted_key(mock_msg.download_id))
115
116 mock_called = False
117 def mock_cb(msg, status):
118 nonlocal mock_called
119 assert msg == mock_msg
120 mock_called = True
121
122 sub = pkg_subscriber.DownloadStatusSubscriber(
123 self.log,
124 self.dts,
125 self.loop,
126 self.project,
127 callback=mock_cb)
128
129 yield from sub.register()
130 yield from asyncio.sleep(1, loop=self.loop)
131
132 yield from self.publisher.publish(w_xpath, xpath, mock_msg)
133 yield from asyncio.sleep(1, loop=self.loop)
134
135 assert mock_called is True
136
137
138 def main(argv=sys.argv[1:]):
139
140 # The unittest framework requires a program name, so use the name of this
141 # file instead (we do not want to have to pass a fake program name to main
142 # when this is called from the interpreter).
143 unittest.main(
144 argv=[__file__] + argv,
145 testRunner=None#xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
146 )
147
148 if __name__ == '__main__':
149 main()