10df45b928eee5ca439c2d49d0d43e1346874f1b
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / imagemgr / client.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import concurrent.futures
20
21 import gi
22 gi.require_version("RwImageMgmtYang", "1.0")
23 from gi.repository import (
24 RwImageMgmtYang,
25 )
26
27
28 class UploadJobError(Exception):
29 pass
30
31
32 class UploadJobFailed(UploadJobError):
33 pass
34
35
36 class UploadJobCancelled(UploadJobFailed):
37 pass
38
39
40 class UploadJobClient(object):
41 """ An upload job DTS client
42
43 This class wraps the DTS upload job actions to be more easily reused across
44 various components
45 """
46 def __init__(self, log, loop, dts):
47 self._log = log
48 self._loop = loop
49 self._dts = dts
50
51 def create_job(self, image_name, image_checksum, cloud_account_names=None):
52 """ Create an image upload_job and return an UploadJob instance
53
54 Arguments:
55 image_name - The name of the image in the image catalog
56 image_checksum - The checksum of the image in the catalog
57 cloud_account_names - Names of the cloud accounts to upload the image to.
58 None uploads the image to all cloud accounts.
59
60 Returns:
61 An UploadJob instance
62 """
63 create_job_msg = RwImageMgmtYang.CreateUploadJob.from_dict({
64 "onboarded_image": {
65 "image_name": image_name,
66 "image_checksum": image_checksum,
67 }
68 })
69
70 if cloud_account_names is not None:
71 create_job_msg.cloud_account = cloud_account_names
72
73 query_iter = yield from self._dts.query_rpc(
74 "I,/rw-image-mgmt:create-upload-job",
75 0,
76 create_job_msg,
77 )
78
79 for fut_resp in query_iter:
80 rpc_result = (yield from fut_resp).result
81
82 job_id = rpc_result.job_id
83
84 return UploadJob(self._log, self._loop, self._dts, job_id)
85
86 def create_job_threadsafe(self, image_name, image_checksum, cloud_account_names=None):
87 """ A thread-safe, syncronous wrapper for create_job """
88 future = concurrent.futures.Future()
89
90 def on_done(asyncio_future):
91 if asyncio_future.exception() is not None:
92 future.set_exception(asyncio_future.exception())
93
94 elif asyncio_future.result() is not None:
95 future.set_result(asyncio_future.result())
96
97 def add_task():
98 task = self._loop.create_task(
99 self.create_job(image_name, image_checksum, cloud_account_names)
100 )
101 task.add_done_callback(on_done)
102
103 self._loop.call_soon_threadsafe(add_task)
104 return future.result()
105
106
107 class UploadJob(object):
108 """ A handle for a image upload job """
109 def __init__(self, log, loop, dts, job_id):
110 self._log = log
111 self._loop = loop
112 self._dts = dts
113 self._job_id = job_id
114
115 @asyncio.coroutine
116 def wait_until_complete(self):
117 """ Wait until the upload job reaches a terminal state
118
119 Raises:
120 UploadJobError: A generic exception occured in the upload job
121 UploadJobFailed: The upload job failed
122 UploadJobCancelled: The upload job was cancelled
123 """
124 self._log.debug("waiting for upload job %s to complete", self._job_id)
125 while True:
126 query_iter = yield from self._dts.query_read(
127 "D,/rw-image-mgmt:upload-jobs/rw-image-mgmt:job[rw-image-mgmt:id='{}']".format(
128 self._job_id
129 )
130 )
131 job_status_msg = None
132 for fut_resp in query_iter:
133 job_status_msg = (yield from fut_resp).result
134 break
135
136 if job_status_msg is None:
137 raise UploadJobError("did not get a status response for job_id: %s",
138 self._job_id)
139
140 if job_status_msg.status == "COMPLETED":
141 msg = "upload job %s completed successfully" % self._job_id
142 self._log.debug(msg)
143 return
144
145 elif job_status_msg.status == "FAILED":
146 msg = "upload job %s as not successful: %s" % (self._job_id, job_status_msg.status)
147 self._log.error(msg)
148 raise UploadJobFailed(msg)
149
150 elif job_status_msg.status == "CANCELLED":
151 msg = "upload job %s was cancelled" % self._job_id
152 self._log.error(msg)
153 raise UploadJobCancelled(msg)
154
155 yield from asyncio.sleep(.5, loop=self._loop)
156
157 def wait_until_complete_threadsafe(self):
158 """ A thread-safe, synchronous wrapper for wait_until_complete """
159
160 future = concurrent.futures.Future()
161
162 def on_done(asyncio_future):
163 if asyncio_future.exception() is not None:
164 future.set_exception(asyncio_future.exception())
165 return
166
167 future.set_result(asyncio_future.result())
168
169 def add_task():
170 task = self._loop.create_task(self.wait_until_complete())
171 task.add_done_callback(on_done)
172
173 self._loop.call_soon_threadsafe(add_task)
174 return future.result()