3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 import concurrent
.futures
22 gi
.require_version("RwImageMgmtYang", "1.0")
23 from gi
.repository
import (
28 class UploadJobError(Exception):
32 class UploadJobFailed(UploadJobError
):
36 class UploadJobCancelled(UploadJobFailed
):
40 class UploadJobClient(object):
41 """ An upload job DTS client
43 This class wraps the DTS upload job actions to be more easily reused across
46 def __init__(self
, log
, loop
, dts
):
51 def create_job(self
, image_name
, image_checksum
, project
, cloud_account_names
=None):
52 """ Create an image upload_job and return an UploadJob instance
55 image_name - The name of the image in the image catalog
56 image_checksum - The checksum of the image in the catalog
57 cloud_account_names - Names of the cloud accounts to upload the image to.
58 None uploads the image to all cloud accounts.
63 create_job_msg
= RwImageMgmtYang
.CreateUploadJob
.from_dict({
65 "project_name": project
.name
,
66 "image_name": image_name
,
67 "image_checksum": image_checksum
,
71 if cloud_account_names
is not None:
72 create_job_msg
.cloud_account
= cloud_account_names
74 query_iter
= yield from self
._dts
.query_rpc(
75 "I,/rw-image-mgmt:create-upload-job",
80 for fut_resp
in query_iter
:
81 rpc_result
= (yield from fut_resp
).result
83 job_id
= rpc_result
.job_id
85 return UploadJob(self
._log
, self
._loop
, self
._dts
, job_id
, project
)
87 def create_job_threadsafe(self
, image_name
, image_checksum
, project
, cloud_account_names
=None):
88 """ A thread-safe, syncronous wrapper for create_job """
89 future
= concurrent
.futures
.Future()
91 def on_done(asyncio_future
):
92 if asyncio_future
.exception() is not None:
93 future
.set_exception(asyncio_future
.exception())
95 elif asyncio_future
.result() is not None:
96 future
.set_result(asyncio_future
.result())
99 task
= self
._loop
.create_task(
100 self
.create_job(image_name
, image_checksum
, project
, cloud_account_names
)
102 task
.add_done_callback(on_done
)
104 self
._loop
.call_soon_threadsafe(add_task
)
105 return future
.result()
108 class UploadJob(object):
109 """ A handle for a image upload job """
110 def __init__(self
, log
, loop
, dts
, job_id
, project
):
114 self
._job
_id
= job_id
115 self
._project
= project
118 def wait_until_complete(self
):
119 """ Wait until the upload job reaches a terminal state
122 UploadJobError: A generic exception occured in the upload job
123 UploadJobFailed: The upload job failed
124 UploadJobCancelled: The upload job was cancelled
126 self
._log
.debug("waiting for upload job %s to complete", self
._job
_id
)
127 xpath
= self
._project
.add_project("D,/rw-image-mgmt:upload-jobs/" +
128 "rw-image-mgmt:job[rw-image-mgmt:id='{}']".
129 format(self
._job
_id
))
131 query_iter
= yield from self
._dts
.query_read(xpath
)
132 job_status_msg
= None
133 for fut_resp
in query_iter
:
134 job_status_msg
= (yield from fut_resp
).result
137 if job_status_msg
is None:
138 raise UploadJobError("did not get a status response for job_id: %s",
141 if job_status_msg
.status
== "COMPLETED":
142 msg
= "upload job %s completed successfully" % self
._job
_id
146 elif job_status_msg
.status
== "FAILED":
147 msg
= "upload job %s as not successful: %s" % (self
._job
_id
, job_status_msg
.status
)
149 raise UploadJobFailed(msg
)
151 elif job_status_msg
.status
== "CANCELLED":
152 msg
= "upload job %s was cancelled" % self
._job
_id
154 raise UploadJobCancelled(msg
)
156 yield from asyncio
.sleep(.5, loop
=self
._loop
)
158 def wait_until_complete_threadsafe(self
):
159 """ A thread-safe, synchronous wrapper for wait_until_complete """
161 future
= concurrent
.futures
.Future()
163 def on_done(asyncio_future
):
164 if asyncio_future
.exception() is not None:
165 future
.set_exception(asyncio_future
.exception())
168 future
.set_result(asyncio_future
.result())
171 task
= self
._loop
.create_task(self
.wait_until_complete())
172 task
.add_done_callback(on_done
)
174 self
._loop
.call_soon_threadsafe(add_task
)
175 return future
.result()