update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwstagingmgr / test / utest_publisher_dts.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20 import asyncio
21 import gi
22 import logging
23 import os
24 import sys
25 import unittest
26 import uuid
27 import xmlrunner
28
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwStagingMgmtYang', '1.0')
31 from gi.repository import (
32 RwDts as rwdts,
33 RwStagingMgmtYang
34 )
35 import rift.tasklets.rwstagingmgr.publisher as publisher
36 import rift.test.dts
37 from rift.mano.utils.project import ManoProject
38 gi.require_version('RwKeyspec', '1.0')
39 from gi.repository.RwKeyspec import quoted_key
40
41 class TestProject(ManoProject):
42 def __init__(self, log, dts, loop):
43 super().__init__(log)
44 self._dts = dts
45 self._loop = loop
46
47
48 class TestCase(rift.test.dts.AbstractDTSTest):
49 @classmethod
50 def configure_schema(cls):
51 return RwStagingMgmtYang.get_schema()
52
53 @classmethod
54 def configure_timeout(cls):
55 return 240
56
57 def configure_test(self, loop, test_id):
58 self.log.debug("STARTING - %s", test_id)
59 self.tinfo = self.new_tinfo(str(test_id))
60 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
61 self.project = TestProject(self.log, self.dts, self.loop)
62
63 self.job_handler = publisher.StagingStorePublisher(self.project)
64
65 def tearDown(self):
66 super().tearDown()
67
68 @asyncio.coroutine
69 def get_published_xpaths(self):
70 published_xpaths = set()
71
72 res_iter = yield from self.dts.query_read("D,/rwdts:dts")
73 for i in res_iter:
74 res = (yield from i).result
75 for member in res.member:
76 published_xpaths |= {reg.keyspec for reg in member.state.registration if reg.flags == "publisher"}
77
78 return published_xpaths
79
80 @asyncio.coroutine
81 def read_xpath(self, xpath):
82 itr = yield from self.dts.query_read(xpath)
83
84 result = None
85 for fut in itr:
86 result = yield from fut
87 return result.result
88
89 @rift.test.dts.async_test
90 def test_download_publisher(self):
91 yield from self.job_handler.register()
92 yield from asyncio.sleep(2, loop=self.loop)
93 published_xpaths = yield from self.get_published_xpaths()
94 assert self.job_handler.xpath() in published_xpaths
95 self.job_handler.deregister()
96
97 @rift.test.dts.async_test
98 def test_publish(self):
99 """
100 """
101 yield from self.job_handler.register()
102
103 mock_msg = RwStagingMgmtYang.YangData_RwProject_Project_StagingAreas_StagingArea.from_dict({
104 "area_id": "123"})
105
106 self.job_handler.on_staging_area_create(mock_msg)
107 yield from asyncio.sleep(5, loop=self.loop)
108
109 xpath = self.project.add_project("/staging-areas/staging-area[area-id={}]".
110 format(quoted_key(mock_msg.area_id)))
111 itr = yield from self.dts.query_read(xpath)
112
113
114 result = None
115 for fut in itr:
116 result = yield from fut
117 result = result.result
118
119 print (result)
120 assert result == mock_msg
121 self.job_handler.deregister()
122
123 def main():
124 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
125
126 parser = argparse.ArgumentParser()
127 parser.add_argument('-v', '--verbose', action='store_true')
128 parser.add_argument('-n', '--no-runner', action='store_true')
129 args, unittest_args = parser.parse_known_args()
130 if args.no_runner:
131 runner = None
132
133 TestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
134
135 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
136
137 if __name__ == '__main__':
138 main()