3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 import concurrent
.futures
22 gi
.require_version("RwImageMgmtYang", "1.0")
23 from gi
.repository
import (
28 class UploadJobError(Exception):
32 class UploadJobFailed(UploadJobError
):
36 class UploadJobCancelled(UploadJobFailed
):
40 class UploadJobClient(object):
41 """ An upload job DTS client
43 This class wraps the DTS upload job actions to be more easily reused across
46 def __init__(self
, log
, loop
, dts
):
51 def create_job(self
, image_name
, image_checksum
, cloud_account_names
=None):
52 """ Create an image upload_job and return an UploadJob instance
55 image_name - The name of the image in the image catalog
56 image_checksum - The checksum of the image in the catalog
57 cloud_account_names - Names of the cloud accounts to upload the image to.
58 None uploads the image to all cloud accounts.
63 create_job_msg
= RwImageMgmtYang
.CreateUploadJob
.from_dict({
65 "image_name": image_name
,
66 "image_checksum": image_checksum
,
70 if cloud_account_names
is not None:
71 create_job_msg
.cloud_account
= cloud_account_names
73 query_iter
= yield from self
._dts
.query_rpc(
74 "I,/rw-image-mgmt:create-upload-job",
79 for fut_resp
in query_iter
:
80 rpc_result
= (yield from fut_resp
).result
82 job_id
= rpc_result
.job_id
84 return UploadJob(self
._log
, self
._loop
, self
._dts
, job_id
)
86 def create_job_threadsafe(self
, image_name
, image_checksum
, cloud_account_names
=None):
87 """ A thread-safe, syncronous wrapper for create_job """
88 future
= concurrent
.futures
.Future()
90 def on_done(asyncio_future
):
91 if asyncio_future
.exception() is not None:
92 future
.set_exception(asyncio_future
.exception())
94 elif asyncio_future
.result() is not None:
95 future
.set_result(asyncio_future
.result())
98 task
= self
._loop
.create_task(
99 self
.create_job(image_name
, image_checksum
, cloud_account_names
)
101 task
.add_done_callback(on_done
)
103 self
._loop
.call_soon_threadsafe(add_task
)
104 return future
.result()
107 class UploadJob(object):
108 """ A handle for a image upload job """
109 def __init__(self
, log
, loop
, dts
, job_id
):
113 self
._job
_id
= job_id
116 def wait_until_complete(self
):
117 """ Wait until the upload job reaches a terminal state
120 UploadJobError: A generic exception occured in the upload job
121 UploadJobFailed: The upload job failed
122 UploadJobCancelled: The upload job was cancelled
124 self
._log
.debug("waiting for upload job %s to complete", self
._job
_id
)
126 query_iter
= yield from self
._dts
.query_read(
127 "D,/rw-image-mgmt:upload-jobs/rw-image-mgmt:job[rw-image-mgmt:id='{}']".format(
131 job_status_msg
= None
132 for fut_resp
in query_iter
:
133 job_status_msg
= (yield from fut_resp
).result
136 if job_status_msg
is None:
137 raise UploadJobError("did not get a status response for job_id: %s",
140 if job_status_msg
.status
== "COMPLETED":
141 msg
= "upload job %s completed successfully" % self
._job
_id
145 elif job_status_msg
.status
== "FAILED":
146 msg
= "upload job %s as not successful: %s" % (self
._job
_id
, job_status_msg
.status
)
148 raise UploadJobFailed(msg
)
150 elif job_status_msg
.status
== "CANCELLED":
151 msg
= "upload job %s was cancelled" % self
._job
_id
153 raise UploadJobCancelled(msg
)
155 yield from asyncio
.sleep(.5, loop
=self
._loop
)
157 def wait_until_complete_threadsafe(self
):
158 """ A thread-safe, synchronous wrapper for wait_until_complete """
160 future
= concurrent
.futures
.Future()
162 def on_done(asyncio_future
):
163 if asyncio_future
.exception() is not None:
164 future
.set_exception(asyncio_future
.exception())
167 future
.set_result(asyncio_future
.result())
170 task
= self
._loop
.create_task(self
.wait_until_complete())
171 task
.add_done_callback(on_done
)
173 self
._loop
.call_soon_threadsafe(add_task
)
174 return future
.result()