6dcabac63b5f86fcb32171f4f5754f795e4365aa
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / imagemgr / client.py
1
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import concurrent.futures
20 import gi
21
22 from rift.mano.utils.project import ManoProject
23
24 gi.require_version("RwImageMgmtYang", "1.0")
25 from gi.repository import (
26 RwImageMgmtYang,
27 )
28 gi.require_version('RwKeyspec', '1.0')
29 from gi.repository.RwKeyspec import quoted_key
30
31
32 class UploadJobError(Exception):
33 pass
34
35
36 class UploadJobFailed(UploadJobError):
37 pass
38
39
40 class UploadJobCancelled(UploadJobFailed):
41 pass
42
43
44 class UploadJobClient(object):
45 """ An upload job DTS client
46
47 This class wraps the DTS upload job actions to be more easily reused across
48 various components
49 """
50 def __init__(self, log, loop, dts):
51 self._log = log
52 self._loop = loop
53 self._dts = dts
54
55 def create_job(self, image_name, image_checksum, project, cloud_account_names=None):
56 """ Create an image upload_job and return an UploadJob instance
57
58 Arguments:
59 image_name - The name of the image in the image catalog
60 image_checksum - The checksum of the image in the catalog
61 cloud_account_names - Names of the cloud accounts to upload the image to.
62 None uploads the image to all cloud accounts.
63
64 Returns:
65 An UploadJob instance
66 """
67 self._log.debug("Project {}: Create image upload job for image {} to {}".
68 format(project, image_name, cloud_account_names))
69
70 create_job_msg = RwImageMgmtYang.YangInput_RwImageMgmt_CreateUploadJob.from_dict({
71 "project_name": project,
72 "onboarded_image": {
73 "image_name": image_name,
74 "image_checksum": image_checksum,
75 }
76 })
77
78 if cloud_account_names is not None:
79 create_job_msg.cloud_account = cloud_account_names
80
81 query_iter = yield from self._dts.query_rpc(
82 "I,/rw-image-mgmt:create-upload-job",
83 0,
84 create_job_msg,
85 )
86
87 for fut_resp in query_iter:
88 rpc_result = (yield from fut_resp).result
89
90 job_id = rpc_result.job_id
91
92 return UploadJob(self._log, self._loop, self._dts, job_id, project)
93
94 def create_job_threadsafe(self, image_name, image_checksum, project, cloud_account_names=None):
95 """ A thread-safe, syncronous wrapper for create_job """
96 future = concurrent.futures.Future()
97
98 def on_done(asyncio_future):
99 if asyncio_future.exception() is not None:
100 future.set_exception(asyncio_future.exception())
101
102 elif asyncio_future.result() is not None:
103 future.set_result(asyncio_future.result())
104
105 def add_task():
106 task = self._loop.create_task(
107 self.create_job(image_name, image_checksum, project, cloud_account_names)
108 )
109 task.add_done_callback(on_done)
110
111 self._loop.call_soon_threadsafe(add_task)
112 return future.result()
113
114
115 class UploadJob(object):
116 """ A handle for a image upload job """
117 def __init__(self, log, loop, dts, job_id, project):
118 self._log = log
119 self._loop = loop
120 self._dts = dts
121 self._job_id = job_id
122 self._project = project
123
124 @asyncio.coroutine
125 def wait_until_complete(self):
126 """ Wait until the upload job reaches a terminal state
127
128 Raises:
129 UploadJobError: A generic exception occured in the upload job
130 UploadJobFailed: The upload job failed
131 UploadJobCancelled: The upload job was cancelled
132 """
133 self._log.debug("waiting for upload job %s to complete", self._job_id)
134 xpath = ManoProject.prefix_project("D,/rw-image-mgmt:upload-jobs/" +
135 "rw-image-mgmt:job[rw-image-mgmt:id={}]".
136 format(quoted_key(str(self._job_id))),
137 project=self._project,
138 log=self._log)
139
140 while True:
141 query_iter = yield from self._dts.query_read(xpath)
142 job_status_msg = None
143 for fut_resp in query_iter:
144 job_status_msg = (yield from fut_resp).result
145 break
146
147 if job_status_msg is None:
148 raise UploadJobError("did not get a status response for job_id: %s",
149 self._job_id)
150
151 if job_status_msg.status == "COMPLETED":
152 msg = "upload job %s completed successfully" % self._job_id
153 self._log.debug(msg)
154 return
155
156 elif job_status_msg.status == "FAILED":
157 msg = "upload job %s as not successful: %s" % (self._job_id, job_status_msg.status)
158 self._log.error(msg)
159 raise UploadJobFailed(msg)
160
161 elif job_status_msg.status == "CANCELLED":
162 msg = "upload job %s was cancelled" % self._job_id
163 self._log.error(msg)
164 raise UploadJobCancelled(msg)
165
166 yield from asyncio.sleep(.5, loop=self._loop)
167
168 def wait_until_complete_threadsafe(self):
169 """ A thread-safe, synchronous wrapper for wait_until_complete """
170
171 future = concurrent.futures.Future()
172
173 def on_done(asyncio_future):
174 if asyncio_future.exception() is not None:
175 future.set_exception(asyncio_future.exception())
176 return
177
178 future.set_result(asyncio_future.result())
179
180 def add_task():
181 task = self._loop.create_task(self.wait_until_complete())
182 task.add_done_callback(on_done)
183
184 self._loop.call_soon_threadsafe(add_task)
185 return future.result()