7ba4f76a8bf18d7c2f93119647488ecae29eae1e
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / test / utest_dts_handlers.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20
21 import asyncio
22
23 #asynctest looks for selectors under it's own package but in
24 #python3.3 it exists under the asyncio package
25 import sys
26 sys.path.append(asyncio.__path__[0])
27 import asynctest
28
29 import logging
30 import os
31 import unittest
32 import unittest.mock
33 import xmlrunner
34
35 import gi
36 gi.require_version("RwDts", "1.0")
37 gi.require_version("RwImageMgmtYang", "1.0")
38 from gi.repository import (
39 RwDts,
40 RwImageMgmtYang,
41 )
42
43 import rift.tasklets
44 import rift.test.dts
45
46 from rift.tasklets.rwimagemgr import tasklet
47 from rift.tasklets.rwimagemgr import upload
48
49 from rift.test.dts import async_test
50
51 import utest_image_upload
52
53
54 def create_job_controller_mock():
55 jc_mock = unittest.mock.Mock(upload.ImageUploadJobController)
56
57 return jc_mock
58
59
60 def create_upload_task_creator_mock():
61 creator_mock = asynctest.CoroutineMock(spec=["create_tasks_from_onboarded_create_rpc"])
62
63 return creator_mock
64
65
66 class RwImageRPCTestCase(rift.test.dts.AbstractDTSTest):
67 @classmethod
68 def configure_schema(cls):
69 return RwImageMgmtYang.get_schema()
70
71 @classmethod
72 def configure_timeout(cls):
73 return 240
74
75 def configure_test(self, loop, test_id):
76 self.log.debug("STARTING - %s", self.id())
77 self.tinfo = self.new_tinfo(self.id())
78 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
79
80 self.task_creator_mock = create_upload_task_creator_mock()
81 self.job_controller_mock = create_job_controller_mock()
82 self.rpc_handler = tasklet.ImageDTSRPCHandler(
83 self.log, self.loop, self.dts, {'mock', None}, object(), self.task_creator_mock,
84 self.job_controller_mock
85 )
86 self.show_handler = tasklet.ImageDTSShowHandler(
87 self.log, self.loop, self.dts, self.job_controller_mock
88 )
89
90 self.tinfo_c = self.new_tinfo(self.id() + "_client")
91 self.dts_c = rift.tasklets.DTS(self.tinfo_c, self.schema, self.loop)
92
93 self._upload_mixin = utest_image_upload.UploadTaskMixin(self.log, self.loop)
94 self._image_mock_mixin = utest_image_upload.ImageMockMixin(self)
95
96 @async_test
97 def test_create_job(self):
98 yield from self.rpc_handler.register()
99 yield from self.show_handler.register()
100
101 account = self._image_mock_mixin.account
102 with self._upload_mixin.create_upload_task(account) as upload_task:
103 self.task_creator_mock.create_tasks_from_onboarded_create_rpc.return_value = [upload_task]
104 self.job_controller_mock.create_job.return_value = 2
105 type(self.job_controller_mock).pb_msg = unittest.mock.PropertyMock(
106 return_value=RwImageMgmtYang.UploadJobs.from_dict({
107 "job": [
108 {
109 "id": 2,
110 "upload_tasks": [upload_task.pb_msg],
111 "status": "COMPLETED"
112 }
113 ]
114 })
115 )
116
117 create_job_msg = RwImageMgmtYang.CreateUploadJob.from_dict({
118 "cloud_account": [upload_task.cloud_account],
119 "onboarded_image": {
120 "image_name": upload_task.image_name,
121 "image_checksum": upload_task.image_checksum,
122 }
123 })
124
125 query_iter = yield from self.dts_c.query_rpc(
126 "I,/rw-image-mgmt:create-upload-job",
127 0,
128 create_job_msg,
129 )
130
131 for fut_resp in query_iter:
132 rpc_result = (yield from fut_resp).result
133
134 self.assertEqual(2, rpc_result.job_id)
135
136 self.assertTrue(
137 self.task_creator_mock.create_tasks_from_onboarded_create_rpc.called
138 )
139
140 query_iter = yield from self.dts_c.query_read(
141 "D,/rw-image-mgmt:upload-jobs",
142 )
143
144 for fut_resp in query_iter:
145 rpc_result = (yield from fut_resp).result
146 self.assertEqual(1, len(rpc_result.job))
147 self.assertEqual(2, rpc_result.job[0].id)
148 self.assertEqual(1, len(rpc_result.job[0].upload_tasks))
149
150
151 def main(argv=sys.argv[1:]):
152 logging.basicConfig(format='TEST %(message)s')
153
154 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
155 parser = argparse.ArgumentParser()
156 parser.add_argument('-v', '--verbose', action='store_true')
157 parser.add_argument('-n', '--no-runner', action='store_true')
158
159 args, unknown = parser.parse_known_args(argv)
160 if args.no_runner:
161 runner = None
162
163 # Set the global logging level
164 logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
165
166 # The unittest framework requires a program name, so use the name of this
167 # file instead (we do not want to have to pass a fake program name to main
168 # when this is called from the interpreter).
169 unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
170
171 if __name__ == '__main__':
172 main()