4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
23 #asynctest looks for selectors under it's own package but in
24 #python3.3 it exists under the asyncio package
26 sys
.path
.append(asyncio
.__path
__[0])
36 gi
.require_version("RwDts", "1.0")
37 gi
.require_version("RwImageMgmtYang", "1.0")
38 from gi
.repository
import (
46 from rift
.tasklets
.rwimagemgr
import tasklet
47 from rift
.tasklets
.rwimagemgr
import upload
48 from rift
.mano
.utils
.project
import ManoProject
, DEFAULT_PROJECT
50 from rift
.test
.dts
import async_test
52 import utest_image_upload
55 def create_job_controller_mock():
56 jc_mock
= unittest
.mock
.Mock(upload
.ImageUploadJobController
)
61 def create_upload_task_creator_mock():
62 creator_mock
= asynctest
.CoroutineMock(spec
=["create_tasks_from_onboarded_create_rpc"])
67 class RwImageRPCTestCase(rift
.test
.dts
.AbstractDTSTest
):
69 def configure_schema(cls
):
70 return RwImageMgmtYang
.get_schema()
73 def configure_timeout(cls
):
76 def configure_test(self
, loop
, test_id
):
77 self
.log
.debug("STARTING - %s", self
.id())
78 self
.tinfo
= self
.new_tinfo(self
.id())
79 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
80 self
.project
= ManoProject(self
.log
, name
=DEFAULT_PROJECT
)
82 self
.task_creator_mock
= create_upload_task_creator_mock()
83 self
.job_controller_mock
= create_job_controller_mock()
84 self
.rpc_handler
= tasklet
.ImageDTSRPCHandler(
85 self
.log
, self
.loop
, self
.dts
, {'mock', None}, object(), self
.task_creator_mock
,
86 self
.job_controller_mock
, self
.project
,
88 self
.show_handler
= tasklet
.ImageDTSShowHandler(
89 self
.log
, self
.loop
, self
.dts
, self
.job_controller_mock
, self
.project
,
92 self
.tinfo_c
= self
.new_tinfo(self
.id() + "_client")
93 self
.dts_c
= rift
.tasklets
.DTS(self
.tinfo_c
, self
.schema
, self
.loop
)
95 self
._upload
_mixin
= utest_image_upload
.UploadTaskMixin(self
.log
, self
.loop
)
96 self
._image
_mock
_mixin
= utest_image_upload
.ImageMockMixin(self
)
99 def test_create_job(self
):
100 yield from self
.rpc_handler
.register()
101 yield from self
.show_handler
.register()
103 account
= self
._image
_mock
_mixin
.account
104 with self
._upload
_mixin
.create_upload_task(account
) as upload_task
:
105 self
.task_creator_mock
.create_tasks_from_onboarded_create_rpc
.return_value
= [upload_task
]
106 self
.job_controller_mock
.create_job
.return_value
= 2
107 type(self
.job_controller_mock
).pb_msg
= unittest
.mock
.PropertyMock(
108 return_value
=RwImageMgmtYang
.UploadJobs
.from_dict({
112 "upload_tasks": [upload_task
.pb_msg
],
113 "status": "COMPLETED"
119 create_job_msg
= RwImageMgmtYang
.CreateUploadJob
.from_dict({
120 "cloud_account": [upload_task
.cloud_account
],
122 "image_name": upload_task
.image_name
,
123 "image_checksum": upload_task
.image_checksum
,
125 "project_name": self
.project
.name
,
128 query_iter
= yield from self
.dts_c
.query_rpc(
129 "I,/rw-image-mgmt:create-upload-job",
134 for fut_resp
in query_iter
:
135 rpc_result
= (yield from fut_resp
).result
137 self
.assertEqual(2, rpc_result
.job_id
)
140 self
.task_creator_mock
.create_tasks_from_onboarded_create_rpc
.called
143 query_iter
= yield from self
.dts_c
.query_read(
144 self
.project
.add_project("D,/rw-image-mgmt:upload-jobs"),
147 for fut_resp
in query_iter
:
148 rpc_result
= (yield from fut_resp
).result
149 self
.assertEqual(1, len(rpc_result
.job
))
150 self
.assertEqual(2, rpc_result
.job
[0].id)
151 self
.assertEqual(1, len(rpc_result
.job
[0].upload_tasks
))
154 def main(argv
=sys
.argv
[1:]):
155 logging
.basicConfig(format
='TEST %(message)s')
157 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
158 parser
= argparse
.ArgumentParser()
159 parser
.add_argument('-v', '--verbose', action
='store_true')
160 parser
.add_argument('-n', '--no-runner', action
='store_true')
162 args
, unknown
= parser
.parse_known_args(argv
)
166 # Set the global logging level
167 logging
.getLogger().setLevel(logging
.DEBUG
if args
.verbose
else logging
.ERROR
)
169 # The unittest framework requires a program name, so use the name of this
170 # file instead (we do not want to have to pass a fake program name to main
171 # when this is called from the interpreter).
172 unittest
.main(argv
=[__file__
] + unknown
+ ["-v"], testRunner
=runner
)
174 if __name__
== '__main__':