update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / test / utest_dts_handlers.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20
21 import asyncio
22
23 #asynctest looks for selectors under it's own package but in
24 #python3.3 it exists under the asyncio package
25 import sys
26 sys.path.append(asyncio.__path__[0])
27 import asynctest
28
29 import logging
30 import os
31 import unittest
32 import unittest.mock
33 import xmlrunner
34
35 import gi
36 gi.require_version("RwDts", "1.0")
37 gi.require_version("RwImageMgmtYang", "1.0")
38 from gi.repository import (
39 RwDts,
40 RwImageMgmtYang,
41 )
42
43 import rift.tasklets
44 import rift.test.dts
45
46 from rift.tasklets.rwimagemgr import tasklet
47 from rift.tasklets.rwimagemgr import upload
48 from rift.mano.utils.project import ManoProject, DEFAULT_PROJECT
49
50 from rift.test.dts import async_test
51
52 import utest_image_upload
53
54
55 def create_job_controller_mock():
56 jc_mock = unittest.mock.Mock(upload.ImageUploadJobController)
57
58 return jc_mock
59
60
61 def create_upload_task_creator_mock():
62 creator_mock = asynctest.CoroutineMock(spec=["create_tasks_from_onboarded_create_rpc"])
63
64 return creator_mock
65
66
67 class RwImageRPCTestCase(rift.test.dts.AbstractDTSTest):
68 @classmethod
69 def configure_schema(cls):
70 return RwImageMgmtYang.get_schema()
71
72 @classmethod
73 def configure_timeout(cls):
74 return 240
75
76 def configure_test(self, loop, test_id):
77 self.log.debug("STARTING - %s", self.id())
78 self.tinfo = self.new_tinfo(self.id())
79
80 self.project = ManoProject(self.log, name=DEFAULT_PROJECT)
81 self.project._dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
82 self.project.cloud_accounts = {'mock'}
83
84 self.task_creator_mock = create_upload_task_creator_mock()
85 self.job_controller_mock = create_job_controller_mock()
86 self.rpc_handler = tasklet.ImageDTSRPCHandler(
87 self.project, object(), self.task_creator_mock,
88 self.job_controller_mock
89 )
90 self.show_handler = tasklet.ImageDTSShowHandler(
91 self.project, self.job_controller_mock)
92
93 self.tinfo_c = self.new_tinfo(self.id() + "_client")
94 self.dts_c = rift.tasklets.DTS(self.tinfo_c, self.schema, self.loop)
95
96 self._upload_mixin = utest_image_upload.UploadTaskMixin(self.log, self.loop)
97 self._image_mock_mixin = utest_image_upload.ImageMockMixin(self)
98
99 @async_test
100 def test_create_job(self):
101 yield from self.rpc_handler.register()
102 yield from self.show_handler.register()
103
104 account = self._image_mock_mixin.account
105 with self._upload_mixin.create_upload_task(account) as upload_task:
106 self.task_creator_mock.create_tasks_from_onboarded_create_rpc.return_value = [upload_task]
107 self.job_controller_mock.create_job.return_value = 2
108 type(self.job_controller_mock).pb_msg = unittest.mock.PropertyMock(
109 return_value=RwImageMgmtYang.YangData_RwProject_Project_UploadJobs.from_dict({
110 "job": [
111 {
112 "id": 2,
113 "upload_tasks": [upload_task.pb_msg],
114 "status": "COMPLETED"
115 }
116 ]
117 })
118 )
119
120 create_job_msg = RwImageMgmtYang.YangInput_RwImageMgmt_CreateUploadJob.from_dict({
121 "cloud_account": [upload_task.cloud_account],
122 "onboarded_image": {
123 "image_name": upload_task.image_name,
124 "image_checksum": upload_task.image_checksum,
125 },
126 "project_name": self.project.name,
127 })
128
129 query_iter = yield from self.dts_c.query_rpc(
130 "I,/rw-image-mgmt:create-upload-job",
131 0,
132 create_job_msg,
133 )
134
135 for fut_resp in query_iter:
136 rpc_result = (yield from fut_resp).result
137
138 self.assertEqual(2, rpc_result.job_id)
139
140 self.assertTrue(
141 self.task_creator_mock.create_tasks_from_onboarded_create_rpc.called
142 )
143
144 query_iter = yield from self.dts_c.query_read(
145 self.project.add_project("D,/rw-image-mgmt:upload-jobs"),
146 )
147
148 for fut_resp in query_iter:
149 rpc_result = (yield from fut_resp).result
150 self.assertEqual(1, len(rpc_result.job))
151 self.assertEqual(2, rpc_result.job[0].id)
152 self.assertEqual(1, len(rpc_result.job[0].upload_tasks))
153
154
155 def main(argv=sys.argv[1:]):
156 logging.basicConfig(format='TEST %(message)s')
157
158 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
159 parser = argparse.ArgumentParser()
160 parser.add_argument('-v', '--verbose', action='store_true')
161 parser.add_argument('-n', '--no-runner', action='store_true')
162
163 args, unknown = parser.parse_known_args(argv)
164 if args.no_runner:
165 runner = None
166
167 # Set the global logging level
168 logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
169
170 # The unittest framework requires a program name, so use the name of this
171 # file instead (we do not want to have to pass a fake program name to main
172 # when this is called from the interpreter).
173 unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
174
175 if __name__ == '__main__':
176 main()