Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / imagemgr / client.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import concurrent.futures
20
21 import gi
22 gi.require_version("RwImageMgmtYang", "1.0")
23 from gi.repository import (
24 RwImageMgmtYang,
25 )
26
27
28 class UploadJobError(Exception):
29 pass
30
31
32 class UploadJobFailed(UploadJobError):
33 pass
34
35
36 class UploadJobCancelled(UploadJobFailed):
37 pass
38
39
40 class UploadJobClient(object):
41 """ An upload job DTS client
42
43 This class wraps the DTS upload job actions to be more easily reused across
44 various components
45 """
46 def __init__(self, log, loop, dts):
47 self._log = log
48 self._loop = loop
49 self._dts = dts
50
51 def create_job(self, image_name, image_checksum, project, cloud_account_names=None):
52 """ Create an image upload_job and return an UploadJob instance
53
54 Arguments:
55 image_name - The name of the image in the image catalog
56 image_checksum - The checksum of the image in the catalog
57 cloud_account_names - Names of the cloud accounts to upload the image to.
58 None uploads the image to all cloud accounts.
59
60 Returns:
61 An UploadJob instance
62 """
63 create_job_msg = RwImageMgmtYang.CreateUploadJob.from_dict({
64 "onboarded_image": {
65 "project_name": project.name,
66 "image_name": image_name,
67 "image_checksum": image_checksum,
68 }
69 })
70
71 if cloud_account_names is not None:
72 create_job_msg.cloud_account = cloud_account_names
73
74 query_iter = yield from self._dts.query_rpc(
75 "I,/rw-image-mgmt:create-upload-job",
76 0,
77 create_job_msg,
78 )
79
80 for fut_resp in query_iter:
81 rpc_result = (yield from fut_resp).result
82
83 job_id = rpc_result.job_id
84
85 return UploadJob(self._log, self._loop, self._dts, job_id, project)
86
87 def create_job_threadsafe(self, image_name, image_checksum, project, cloud_account_names=None):
88 """ A thread-safe, syncronous wrapper for create_job """
89 future = concurrent.futures.Future()
90
91 def on_done(asyncio_future):
92 if asyncio_future.exception() is not None:
93 future.set_exception(asyncio_future.exception())
94
95 elif asyncio_future.result() is not None:
96 future.set_result(asyncio_future.result())
97
98 def add_task():
99 task = self._loop.create_task(
100 self.create_job(image_name, image_checksum, project, cloud_account_names)
101 )
102 task.add_done_callback(on_done)
103
104 self._loop.call_soon_threadsafe(add_task)
105 return future.result()
106
107
108 class UploadJob(object):
109 """ A handle for a image upload job """
110 def __init__(self, log, loop, dts, job_id, project):
111 self._log = log
112 self._loop = loop
113 self._dts = dts
114 self._job_id = job_id
115 self._project = project
116
117 @asyncio.coroutine
118 def wait_until_complete(self):
119 """ Wait until the upload job reaches a terminal state
120
121 Raises:
122 UploadJobError: A generic exception occured in the upload job
123 UploadJobFailed: The upload job failed
124 UploadJobCancelled: The upload job was cancelled
125 """
126 self._log.debug("waiting for upload job %s to complete", self._job_id)
127 xpath = self._project.add_project("D,/rw-image-mgmt:upload-jobs/" +
128 "rw-image-mgmt:job[rw-image-mgmt:id='{}']".
129 format(self._job_id))
130 while True:
131 query_iter = yield from self._dts.query_read(xpath)
132 job_status_msg = None
133 for fut_resp in query_iter:
134 job_status_msg = (yield from fut_resp).result
135 break
136
137 if job_status_msg is None:
138 raise UploadJobError("did not get a status response for job_id: %s",
139 self._job_id)
140
141 if job_status_msg.status == "COMPLETED":
142 msg = "upload job %s completed successfully" % self._job_id
143 self._log.debug(msg)
144 return
145
146 elif job_status_msg.status == "FAILED":
147 msg = "upload job %s as not successful: %s" % (self._job_id, job_status_msg.status)
148 self._log.error(msg)
149 raise UploadJobFailed(msg)
150
151 elif job_status_msg.status == "CANCELLED":
152 msg = "upload job %s was cancelled" % self._job_id
153 self._log.error(msg)
154 raise UploadJobCancelled(msg)
155
156 yield from asyncio.sleep(.5, loop=self._loop)
157
158 def wait_until_complete_threadsafe(self):
159 """ A thread-safe, synchronous wrapper for wait_until_complete """
160
161 future = concurrent.futures.Future()
162
163 def on_done(asyncio_future):
164 if asyncio_future.exception() is not None:
165 future.set_exception(asyncio_future.exception())
166 return
167
168 future.set_result(asyncio_future.result())
169
170 def add_task():
171 task = self._loop.create_task(self.wait_until_complete())
172 task.add_done_callback(on_done)
173
174 self._loop.call_soon_threadsafe(add_task)
175 return future.result()