4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
23 #asynctest looks for selectors under it's own package but in
24 #python3.3 it exists under the asyncio package
26 sys
.path
.append(asyncio
.__path
__[0])
36 gi
.require_version("RwDts", "1.0")
37 gi
.require_version("RwImageMgmtYang", "1.0")
38 from gi
.repository
import (
46 from rift
.tasklets
.rwimagemgr
import tasklet
47 from rift
.tasklets
.rwimagemgr
import upload
48 from rift
.mano
.utils
.project
import ManoProject
, DEFAULT_PROJECT
50 from rift
.test
.dts
import async_test
52 import utest_image_upload
55 def create_job_controller_mock():
56 jc_mock
= unittest
.mock
.Mock(upload
.ImageUploadJobController
)
61 def create_upload_task_creator_mock():
62 creator_mock
= asynctest
.CoroutineMock(spec
=["create_tasks_from_onboarded_create_rpc"])
67 class RwImageRPCTestCase(rift
.test
.dts
.AbstractDTSTest
):
69 def configure_schema(cls
):
70 return RwImageMgmtYang
.get_schema()
73 def configure_timeout(cls
):
76 def configure_test(self
, loop
, test_id
):
77 self
.log
.debug("STARTING - %s", self
.id())
78 self
.tinfo
= self
.new_tinfo(self
.id())
80 self
.project
= ManoProject(self
.log
, name
=DEFAULT_PROJECT
)
81 self
.project
._dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
82 self
.project
.cloud_accounts
= {'mock'}
84 self
.task_creator_mock
= create_upload_task_creator_mock()
85 self
.job_controller_mock
= create_job_controller_mock()
86 self
.rpc_handler
= tasklet
.ImageDTSRPCHandler(
87 self
.project
, object(), self
.task_creator_mock
,
88 self
.job_controller_mock
90 self
.show_handler
= tasklet
.ImageDTSShowHandler(
91 self
.project
, self
.job_controller_mock
)
93 self
.tinfo_c
= self
.new_tinfo(self
.id() + "_client")
94 self
.dts_c
= rift
.tasklets
.DTS(self
.tinfo_c
, self
.schema
, self
.loop
)
96 self
._upload
_mixin
= utest_image_upload
.UploadTaskMixin(self
.log
, self
.loop
)
97 self
._image
_mock
_mixin
= utest_image_upload
.ImageMockMixin(self
)
100 def test_create_job(self
):
101 yield from self
.rpc_handler
.register()
102 yield from self
.show_handler
.register()
104 account
= self
._image
_mock
_mixin
.account
105 with self
._upload
_mixin
.create_upload_task(account
) as upload_task
:
106 self
.task_creator_mock
.create_tasks_from_onboarded_create_rpc
.return_value
= [upload_task
]
107 self
.job_controller_mock
.create_job
.return_value
= 2
108 type(self
.job_controller_mock
).pb_msg
= unittest
.mock
.PropertyMock(
109 return_value
=RwImageMgmtYang
.YangData_RwProject_Project_UploadJobs
.from_dict({
113 "upload_tasks": [upload_task
.pb_msg
],
114 "status": "COMPLETED"
120 create_job_msg
= RwImageMgmtYang
.YangInput_RwImageMgmt_CreateUploadJob
.from_dict({
121 "cloud_account": [upload_task
.cloud_account
],
123 "image_name": upload_task
.image_name
,
124 "image_checksum": upload_task
.image_checksum
,
126 "project_name": self
.project
.name
,
129 query_iter
= yield from self
.dts_c
.query_rpc(
130 "I,/rw-image-mgmt:create-upload-job",
135 for fut_resp
in query_iter
:
136 rpc_result
= (yield from fut_resp
).result
138 self
.assertEqual(2, rpc_result
.job_id
)
141 self
.task_creator_mock
.create_tasks_from_onboarded_create_rpc
.called
144 query_iter
= yield from self
.dts_c
.query_read(
145 self
.project
.add_project("D,/rw-image-mgmt:upload-jobs"),
148 for fut_resp
in query_iter
:
149 rpc_result
= (yield from fut_resp
).result
150 self
.assertEqual(1, len(rpc_result
.job
))
151 self
.assertEqual(2, rpc_result
.job
[0].id)
152 self
.assertEqual(1, len(rpc_result
.job
[0].upload_tasks
))
155 def main(argv
=sys
.argv
[1:]):
156 logging
.basicConfig(format
='TEST %(message)s')
158 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
159 parser
= argparse
.ArgumentParser()
160 parser
.add_argument('-v', '--verbose', action
='store_true')
161 parser
.add_argument('-n', '--no-runner', action
='store_true')
163 args
, unknown
= parser
.parse_known_args(argv
)
167 # Set the global logging level
168 logging
.getLogger().setLevel(logging
.DEBUG
if args
.verbose
else logging
.ERROR
)
170 # The unittest framework requires a program name, so use the name of this
171 # file instead (we do not want to have to pass a fake program name to main
172 # when this is called from the interpreter).
173 unittest
.main(argv
=[__file__
] + unknown
+ ["-v"], testRunner
=runner
)
175 if __name__
== '__main__':