3 # Copyright 2016-2017 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 import concurrent
.futures
22 from rift
.mano
.utils
.project
import ManoProject
24 gi
.require_version("RwImageMgmtYang", "1.0")
25 from gi
.repository
import (
28 gi
.require_version('RwKeyspec', '1.0')
29 from gi
.repository
.RwKeyspec
import quoted_key
32 class UploadJobError(Exception):
36 class UploadJobFailed(UploadJobError
):
40 class UploadJobCancelled(UploadJobFailed
):
44 class UploadJobClient(object):
45 """ An upload job DTS client
47 This class wraps the DTS upload job actions to be more easily reused across
50 def __init__(self
, log
, loop
, dts
):
55 def create_job(self
, image_name
, image_checksum
, project
, cloud_account_names
=None):
56 """ Create an image upload_job and return an UploadJob instance
59 image_name - The name of the image in the image catalog
60 image_checksum - The checksum of the image in the catalog
61 cloud_account_names - Names of the cloud accounts to upload the image to.
62 None uploads the image to all cloud accounts.
67 self
._log
.debug("Project {}: Create image upload job for image {} to {}".
68 format(project
, image_name
, cloud_account_names
))
70 create_job_msg
= RwImageMgmtYang
.YangInput_RwImageMgmt_CreateUploadJob
.from_dict({
71 "project_name": project
,
73 "image_name": image_name
,
74 "image_checksum": image_checksum
,
78 if cloud_account_names
is not None:
79 create_job_msg
.cloud_account
= cloud_account_names
81 query_iter
= yield from self
._dts
.query_rpc(
82 "I,/rw-image-mgmt:create-upload-job",
87 for fut_resp
in query_iter
:
88 rpc_result
= (yield from fut_resp
).result
90 job_id
= rpc_result
.job_id
92 return UploadJob(self
._log
, self
._loop
, self
._dts
, job_id
, project
)
94 def create_job_threadsafe(self
, image_name
, image_checksum
, project
, cloud_account_names
=None):
95 """ A thread-safe, syncronous wrapper for create_job """
96 future
= concurrent
.futures
.Future()
98 def on_done(asyncio_future
):
99 if asyncio_future
.exception() is not None:
100 future
.set_exception(asyncio_future
.exception())
102 elif asyncio_future
.result() is not None:
103 future
.set_result(asyncio_future
.result())
106 task
= self
._loop
.create_task(
107 self
.create_job(image_name
, image_checksum
, project
, cloud_account_names
)
109 task
.add_done_callback(on_done
)
111 self
._loop
.call_soon_threadsafe(add_task
)
112 return future
.result()
115 class UploadJob(object):
116 """ A handle for a image upload job """
117 def __init__(self
, log
, loop
, dts
, job_id
, project
):
121 self
._job
_id
= job_id
122 self
._project
= project
125 def wait_until_complete(self
):
126 """ Wait until the upload job reaches a terminal state
129 UploadJobError: A generic exception occured in the upload job
130 UploadJobFailed: The upload job failed
131 UploadJobCancelled: The upload job was cancelled
133 self
._log
.debug("waiting for upload job %s to complete", self
._job
_id
)
134 xpath
= ManoProject
.prefix_project("D,/rw-image-mgmt:upload-jobs/" +
135 "rw-image-mgmt:job[rw-image-mgmt:id={}]".
136 format(quoted_key(str(self
._job
_id
))),
137 project
=self
._project
,
141 query_iter
= yield from self
._dts
.query_read(xpath
)
142 job_status_msg
= None
143 for fut_resp
in query_iter
:
144 job_status_msg
= (yield from fut_resp
).result
147 if job_status_msg
is None:
148 raise UploadJobError("did not get a status response for job_id: %s",
151 if job_status_msg
.status
== "COMPLETED":
152 msg
= "upload job %s completed successfully" % self
._job
_id
156 elif job_status_msg
.status
== "FAILED":
157 msg
= "upload job %s as not successful: %s" % (self
._job
_id
, job_status_msg
.status
)
159 raise UploadJobFailed(msg
)
161 elif job_status_msg
.status
== "CANCELLED":
162 msg
= "upload job %s was cancelled" % self
._job
_id
164 raise UploadJobCancelled(msg
)
166 yield from asyncio
.sleep(.5, loop
=self
._loop
)
168 def wait_until_complete_threadsafe(self
):
169 """ A thread-safe, synchronous wrapper for wait_until_complete """
171 future
= concurrent
.futures
.Future()
173 def on_done(asyncio_future
):
174 if asyncio_future
.exception() is not None:
175 future
.set_exception(asyncio_future
.exception())
178 future
.set_result(asyncio_future
.result())
181 task
= self
._loop
.create_task(self
.wait_until_complete())
182 task
.add_done_callback(on_done
)
184 self
._loop
.call_soon_threadsafe(add_task
)
185 return future
.result()