4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
23 #asynctest looks for selectors under it's own package but in
24 #python3.3 it exists under the asyncio package
26 sys
.path
.append(asyncio
.__path
__[0])
36 gi
.require_version("RwDts", "1.0")
37 gi
.require_version("RwImageMgmtYang", "1.0")
38 from gi
.repository
import (
46 from rift
.tasklets
.rwimagemgr
import tasklet
47 from rift
.tasklets
.rwimagemgr
import upload
49 from rift
.test
.dts
import async_test
51 import utest_image_upload
54 def create_job_controller_mock():
55 jc_mock
= unittest
.mock
.Mock(upload
.ImageUploadJobController
)
60 def create_upload_task_creator_mock():
61 creator_mock
= asynctest
.CoroutineMock(spec
=["create_tasks_from_onboarded_create_rpc"])
66 class RwImageRPCTestCase(rift
.test
.dts
.AbstractDTSTest
):
68 def configure_schema(cls
):
69 return RwImageMgmtYang
.get_schema()
72 def configure_timeout(cls
):
75 def configure_test(self
, loop
, test_id
):
76 self
.log
.debug("STARTING - %s", self
.id())
77 self
.tinfo
= self
.new_tinfo(self
.id())
78 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
80 self
.task_creator_mock
= create_upload_task_creator_mock()
81 self
.job_controller_mock
= create_job_controller_mock()
82 self
.rpc_handler
= tasklet
.ImageDTSRPCHandler(
83 self
.log
, self
.loop
, self
.dts
, {'mock', None}, object(), self
.task_creator_mock
,
84 self
.job_controller_mock
86 self
.show_handler
= tasklet
.ImageDTSShowHandler(
87 self
.log
, self
.loop
, self
.dts
, self
.job_controller_mock
90 self
.tinfo_c
= self
.new_tinfo(self
.id() + "_client")
91 self
.dts_c
= rift
.tasklets
.DTS(self
.tinfo_c
, self
.schema
, self
.loop
)
93 self
._upload
_mixin
= utest_image_upload
.UploadTaskMixin(self
.log
, self
.loop
)
94 self
._image
_mock
_mixin
= utest_image_upload
.ImageMockMixin(self
)
97 def test_create_job(self
):
98 yield from self
.rpc_handler
.register()
99 yield from self
.show_handler
.register()
101 account
= self
._image
_mock
_mixin
.account
102 with self
._upload
_mixin
.create_upload_task(account
) as upload_task
:
103 self
.task_creator_mock
.create_tasks_from_onboarded_create_rpc
.return_value
= [upload_task
]
104 self
.job_controller_mock
.create_job
.return_value
= 2
105 type(self
.job_controller_mock
).pb_msg
= unittest
.mock
.PropertyMock(
106 return_value
=RwImageMgmtYang
.UploadJobs
.from_dict({
110 "upload_tasks": [upload_task
.pb_msg
],
111 "status": "COMPLETED"
117 create_job_msg
= RwImageMgmtYang
.CreateUploadJob
.from_dict({
118 "cloud_account": [upload_task
.cloud_account
],
120 "image_name": upload_task
.image_name
,
121 "image_checksum": upload_task
.image_checksum
,
125 query_iter
= yield from self
.dts_c
.query_rpc(
126 "I,/rw-image-mgmt:create-upload-job",
131 for fut_resp
in query_iter
:
132 rpc_result
= (yield from fut_resp
).result
134 self
.assertEqual(2, rpc_result
.job_id
)
137 self
.task_creator_mock
.create_tasks_from_onboarded_create_rpc
.called
140 query_iter
= yield from self
.dts_c
.query_read(
141 "D,/rw-image-mgmt:upload-jobs",
144 for fut_resp
in query_iter
:
145 rpc_result
= (yield from fut_resp
).result
146 self
.assertEqual(1, len(rpc_result
.job
))
147 self
.assertEqual(2, rpc_result
.job
[0].id)
148 self
.assertEqual(1, len(rpc_result
.job
[0].upload_tasks
))
151 def main(argv
=sys
.argv
[1:]):
152 logging
.basicConfig(format
='TEST %(message)s')
154 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
155 parser
= argparse
.ArgumentParser()
156 parser
.add_argument('-v', '--verbose', action
='store_true')
157 parser
.add_argument('-n', '--no-runner', action
='store_true')
159 args
, unknown
= parser
.parse_known_args(argv
)
163 # Set the global logging level
164 logging
.getLogger().setLevel(logging
.DEBUG
if args
.verbose
else logging
.ERROR
)
166 # The unittest framework requires a program name, so use the name of this
167 # file instead (we do not want to have to pass a fake program name to main
168 # when this is called from the interpreter).
169 unittest
.main(argv
=[__file__
] + unknown
+ ["-v"], testRunner
=runner
)
171 if __name__
== '__main__':