027e58212bafffe95ea0934888203624c8b4beb5
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / tasklets / rwimagemgr / tasklet.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import os
20 import threading
21 import time
22
23 import rift.tasklets
24 import rift.mano.cloud
25
26 from . import glance_proxy_server
27 from . import glance_client
28 from . import upload
29
30 import gi
31 gi.require_version('RwImageMgmtYang', '1.0')
32 gi.require_version('RwLaunchpadYang', '1.0')
33 gi.require_version('RwDts', '1.0')
34
35 from gi.repository import (
36 RwcalYang,
37 RwDts as rwdts,
38 RwImageMgmtYang,
39 RwLaunchpadYang,
40 )
41
42
43 class ImageRequestError(Exception):
44 pass
45
46
47 class AccountNotFoundError(ImageRequestError):
48 pass
49
50
51 class ImageNotFoundError(ImageRequestError):
52 pass
53
54
55 class CloudAccountDtsHandler(object):
56 def __init__(self, log, dts, log_hdl):
57 self._dts = dts
58 self._log = log
59 self._log_hdl = log_hdl
60 self._cloud_cfg_subscriber = None
61
62 def register(self, on_add_apply, on_delete_apply):
63 self._log.debug("creating cloud account config handler")
64 self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber(
65 self._dts, self._log, self._log_hdl,
66 rift.mano.cloud.CloudAccountConfigCallbacks(
67 on_add_apply=on_add_apply,
68 on_delete_apply=on_delete_apply,
69 )
70 )
71 self._cloud_cfg_subscriber.register()
72
73
74 def openstack_image_to_image_info(openstack_image):
75 """Convert the OpenstackImage to a ImageInfo protobuf message
76
77 Arguments:
78 openstack_image - A OpenstackImage instance
79
80 Returns:
81 A ImageInfo CAL protobuf message
82 """
83
84 image_info = RwcalYang.ImageInfoItem()
85
86 copy_fields = ["id", "name", "checksum", "container_format", "disk_format"]
87 for field in copy_fields:
88 value = getattr(openstack_image, field)
89 setattr(image_info, field, value)
90
91 image_info.state = openstack_image.status
92
93 return image_info
94
95
96 class ImageDTSShowHandler(object):
97 """ A DTS publisher for the upload-jobs data container """
98 def __init__(self, log, loop, dts, job_controller):
99 self._log = log
100 self._loop = loop
101 self._dts = dts
102 self._job_controller = job_controller
103
104 self._subscriber = None
105
106 @asyncio.coroutine
107 def register(self):
108 """ Register as a publisher and wait for reg_ready to complete """
109 def get_xpath():
110 return "D,/rw-image-mgmt:upload-jobs"
111
112 @asyncio.coroutine
113 def on_prepare(xact_info, action, ks_path, msg):
114 if action != rwdts.QueryAction.READ:
115 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
116 return
117
118 jobs_pb_msg = self._job_controller.pb_msg
119
120 xact_info.respond_xpath(
121 rwdts.XactRspCode.ACK,
122 xpath=get_xpath(),
123 msg=jobs_pb_msg,
124 )
125
126 reg_event = asyncio.Event(loop=self._loop)
127
128 @asyncio.coroutine
129 def on_ready(regh, status):
130 reg_event.set()
131
132 self._subscriber = yield from self._dts.register(
133 xpath=get_xpath(),
134 handler=rift.tasklets.DTS.RegistrationHandler(
135 on_prepare=on_prepare,
136 on_ready=on_ready,
137 ),
138 flags=rwdts.Flag.PUBLISHER,
139 )
140
141 yield from reg_event.wait()
142
143
144 class ImageDTSRPCHandler(object):
145 """ A DTS publisher for the upload-job RPC's """
146 def __init__(self, log, loop, dts, accounts, glance_client, upload_task_creator, job_controller):
147 self._log = log
148 self._loop = loop
149 self._dts = dts
150 self._accounts = accounts
151 self._glance_client = glance_client
152 self._upload_task_creator = upload_task_creator
153 self._job_controller = job_controller
154
155 self._subscriber = None
156
157 @asyncio.coroutine
158 def _register_create_upload_job(self):
159 def get_xpath():
160 return "/rw-image-mgmt:create-upload-job"
161
162 @asyncio.coroutine
163 def on_prepare(xact_info, action, ks_path, msg):
164 create_msg = msg
165
166 account_names = create_msg.cloud_account
167 # If cloud accounts were not specified, upload image to all cloud account
168 if not account_names:
169 account_names = list(self._accounts.keys())
170
171 for account_name in account_names:
172 if account_name not in self._accounts:
173 raise AccountNotFoundError("Could not find account %s", account_name)
174
175 if create_msg.has_field("external_url"):
176 glance_image = yield from self._upload_task_creator.create_glance_image_from_url_create_rpc(
177 account_names, create_msg.external_url
178 )
179
180 tasks = yield from self._upload_task_creator.create_tasks_from_glance_id(
181 account_names, glance_image.id
182 )
183
184 def delete_image(ft):
185 try:
186 self._glance_client.delete_image_from_id(glance_image.id)
187 except glance_client.OpenstackImageDeleteError:
188 pass
189
190 # Create a job and when the job completes delete the temporary
191 # image from the catalog.
192 job_id = self._job_controller.create_job(
193 tasks,
194 on_completed=delete_image
195 )
196
197 elif create_msg.has_field("onboarded_image"):
198 tasks = yield from self._upload_task_creator.create_tasks_from_onboarded_create_rpc(
199 account_names, create_msg.onboarded_image
200 )
201 job_id = self._job_controller.create_job(tasks)
202
203 else:
204 raise ImageRequestError("an image selection must be provided")
205
206 rpc_out_msg = RwImageMgmtYang.CreateUploadJobOutput(job_id=job_id)
207
208 xact_info.respond_xpath(
209 rwdts.XactRspCode.ACK,
210 xpath="O," + get_xpath(),
211 msg=rpc_out_msg,
212 )
213
214 reg_event = asyncio.Event(loop=self._loop)
215
216 @asyncio.coroutine
217 def on_ready(_, status):
218 reg_event.set()
219
220 self._subscriber = yield from self._dts.register(
221 xpath="I," + get_xpath(),
222 handler=rift.tasklets.DTS.RegistrationHandler(
223 on_prepare=on_prepare,
224 on_ready=on_ready,
225 ),
226 flags=rwdts.Flag.PUBLISHER,
227 )
228
229 yield from reg_event.wait()
230
231 @asyncio.coroutine
232 def _register_cancel_upload_job(self):
233 def get_xpath():
234 return "/rw-image-mgmt:cancel-upload-job"
235
236 @asyncio.coroutine
237 def on_prepare(xact_info, action, ks_path, msg):
238 if not msg.has_field("job_id"):
239 self._log.error("cancel-upload-job missing job-id field.")
240 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
241 return
242
243 job_id = msg.job_id
244
245 job = self._job_controller.get_job(job_id)
246 job.stop()
247
248 xact_info.respond_xpath(
249 rwdts.XactRspCode.ACK,
250 xpath="O," + get_xpath(),
251 )
252
253 reg_event = asyncio.Event(loop=self._loop)
254
255 @asyncio.coroutine
256 def on_ready(_, status):
257 reg_event.set()
258
259 self._subscriber = yield from self._dts.register(
260 xpath="I," + get_xpath(),
261 handler=rift.tasklets.DTS.RegistrationHandler(
262 on_prepare=on_prepare,
263 on_ready=on_ready,
264 ),
265 flags=rwdts.Flag.PUBLISHER,
266 )
267
268 yield from reg_event.wait()
269
270 @asyncio.coroutine
271 def register(self):
272 """ Register for RPC's and wait for all registrations to complete """
273 yield from self._register_create_upload_job()
274 yield from self._register_cancel_upload_job()
275
276
277 class GlanceClientUploadTaskCreator(object):
278 """ This class creates upload tasks using configured cloud accounts and
279 configured image catalog glance client """
280
281 def __init__(self, log, loop, accounts, glance_client):
282 self._log = log
283 self._loop = loop
284 self._accounts = accounts
285 self._glance_client = glance_client
286
287 @asyncio.coroutine
288 def create_tasks(self, account_names, image_id=None, image_name=None, image_checksum=None):
289 """ Create a list of UploadTasks for a list of cloud accounts
290 and a image with a matching image_name and image_checksum in the
291 catalog
292
293 Arguments:
294 account_names - A list of configured cloud account names
295 image_id - A image id
296 image_name - A image name
297 image_checksum - A image checksum
298
299 Returns:
300 A list of AccountImageUploadTask instances
301
302 Raises:
303 ImageNotFoundError - Could not find a matching image in the
304 image catalog
305
306 AccountNotFoundError - Could not find an account that matched
307 the provided account name
308 """
309 try:
310 image = yield from asyncio.wait_for(
311 self._loop.run_in_executor(
312 None,
313 self._glance_client.find_active_image,
314 image_id,
315 image_name,
316 image_checksum,
317 ),
318 timeout=5,
319 loop=self._loop,
320 )
321
322 except glance_client.OpenstackImageError as e:
323 msg = "Could not find image in Openstack to upload"
324 self._log.exception(msg)
325 raise ImageNotFoundError(msg) from e
326
327 image_info = openstack_image_to_image_info(image)
328 self._log.debug("created image info: %s", image_info)
329
330 tasks = []
331 for account_name in account_names:
332 if account_name not in self._accounts:
333 raise AccountNotFoundError("Could not find account %s", account_name)
334
335 # For each account name provided, create a pipe (GlanceImagePipeGen)
336 # which feeds data into the UploadTask while also monitoring the various
337 # transmit stats (progress, bytes written, bytes per second, etc)
338 for account_name in account_names:
339 account = self._accounts[account_name]
340 self._log.debug("creating task for account %s", account.name)
341 glance_data_gen = self._glance_client.get_image_data(image_info.id)
342
343 pipe_gen = upload.GlanceImagePipeGen(self._log, self._loop, glance_data_gen)
344 progress_pipe = upload.UploadProgressWriteProxy(
345 self._log, self._loop, image.size, pipe_gen.write_hdl
346 )
347 progress_pipe.start_rate_monitoring()
348 pipe_gen.write_hdl = progress_pipe
349 pipe_gen.start()
350
351 task = upload.AccountImageUploadTask(
352 self._log, self._loop, account, image_info, pipe_gen.read_hdl,
353 progress_info=progress_pipe, write_canceller=pipe_gen,
354 )
355 tasks.append(task)
356 self._log.debug("task created: %s", task)
357
358 return tasks
359
360 @asyncio.coroutine
361 def create_glance_image_from_url_create_rpc(self, account_names, create_msg):
362 if "image_url" not in create_msg:
363 raise ValueError("image_url must be specified")
364
365 if "image_id" in create_msg:
366 raise ImageRequestError("Cannot specify both image_url and image_id")
367
368 if "image_name" not in create_msg:
369 raise ImageRequestError("image_name must be specified when image_url is provided")
370
371 glance_image = yield from asyncio.wait_for(
372 self._loop.run_in_executor(
373 None,
374 self._glance_client.create_image_from_url,
375 create_msg.image_url,
376 create_msg.image_name,
377 create_msg.image_checksum if "image_checksum" in create_msg else None,
378 create_msg.disk_format if "disk_format" in create_msg else None,
379 create_msg.container_format if "container_format" in create_msg else None,
380 ),
381 timeout=5,
382 loop=self._loop,
383 )
384
385 return glance_image
386
387 @asyncio.coroutine
388 def create_tasks_from_glance_id(self, account_names, glance_image_id):
389 return (yield from self.create_tasks(account_names, glance_image_id))
390
391 @asyncio.coroutine
392 def create_tasks_from_onboarded_create_rpc(self, account_names, create_msg):
393 return (yield from self.create_tasks(
394 account_names,
395 create_msg.image_id if "image_id" in create_msg else None,
396 create_msg.image_name if "image_name" in create_msg else None,
397 create_msg.image_checksum if "image_checksum" in create_msg else None)
398 )
399
400
401 class ImageManagerTasklet(rift.tasklets.Tasklet):
402 """
403 The RwImageMgrTasklet provides a interface for DTS to interact with an
404 instance of the Monitor class. This allows the Monitor class to remain
405 independent of DTS.
406 """
407
408 def __init__(self, *args, **kwargs):
409 super().__init__(*args, **kwargs)
410 self.rwlog.set_category("rw-mano-log")
411
412 self.cloud_cfg_subscriber = None
413 self.http_proxy = None
414 self.proxy_server = None
415 self.dts = None
416 self.job_controller = None
417 self.cloud_accounts = {}
418 self.glance_client = None
419 self.task_creator = None
420 self.rpc_handler = None
421 self.show_handler = None
422
423 def start(self):
424 super().start()
425 self.log.info("Starting Image Manager Tasklet")
426
427 self.log.debug("Registering with dts")
428 self.dts = rift.tasklets.DTS(
429 self.tasklet_info,
430 RwImageMgmtYang.get_schema(),
431 self.loop,
432 self.on_dts_state_change
433 )
434
435 self.log.debug("Created DTS Api GI Object: %s", self.dts)
436
437 def stop(self):
438 try:
439 self.dts.deinit()
440 except Exception as e:
441 self.log.exception(e)
442
443 @asyncio.coroutine
444 def init(self):
445 try:
446 self.log.debug("creating cloud account handler")
447 self.cloud_cfg_subscriber = CloudAccountDtsHandler(self.log, self.dts, self.log_hdl)
448 self.cloud_cfg_subscriber.register(
449 self.on_cloud_account_create,
450 self.on_cloud_account_delete
451 )
452
453 self.log.debug("creating http proxy server")
454
455 self.http_proxy = glance_proxy_server.QuickProxyServer(self.log, self.loop)
456
457 self.proxy_server = glance_proxy_server.GlanceHTTPProxyServer(
458 self.log, self.loop, self.http_proxy
459 )
460 self.proxy_server.start()
461
462 self.job_controller = upload.ImageUploadJobController(
463 self.log, self.loop
464 )
465
466 self.glance_client = glance_client.OpenstackGlanceClient.from_token(
467 self.log, "127.0.0.1", "9292", "test"
468 )
469
470 self.task_creator = GlanceClientUploadTaskCreator(
471 self.log, self.loop, self.cloud_accounts, self.glance_client
472 )
473
474 self.rpc_handler = ImageDTSRPCHandler(
475 self.log, self.loop, self.dts, self.cloud_accounts, self.glance_client, self.task_creator,
476 self.job_controller
477 )
478 yield from self.rpc_handler.register()
479
480 self.show_handler = ImageDTSShowHandler(
481 self.log, self.loop, self.dts, self.job_controller
482 )
483 yield from self.show_handler.register()
484
485 except Exception as e:
486 self.log.exception("error during init")
487
488 def on_cloud_account_create(self, account):
489 self.log.debug("adding cloud account: %s", account.name)
490 self.cloud_accounts[account.name] = account
491
492 def on_cloud_account_delete(self, account_name):
493 self.log.debug("deleting cloud account: %s", account_name)
494 if account_name not in self.cloud_accounts:
495 self.log.warning("cloud account not found: %s", account_name)
496
497 del self.cloud_accounts[account_name]
498
499 @asyncio.coroutine
500 def run(self):
501 pass
502
503 def on_instance_started(self):
504 self.log.debug("Got instance started callback")
505
506 @asyncio.coroutine
507 def on_dts_state_change(self, state):
508 """Handle DTS state change
509
510 Take action according to current DTS state to transition application
511 into the corresponding application state
512
513 Arguments
514 state - current dts state
515
516 """
517 switch = {
518 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
519 rwdts.State.CONFIG: rwdts.State.RUN,
520 }
521
522 handlers = {
523 rwdts.State.INIT: self.init,
524 rwdts.State.RUN: self.run,
525 }
526
527 # Transition application to next state
528 handler = handlers.get(state, None)
529 if handler is not None:
530 yield from handler()
531
532 # Transition dts to next state
533 next_state = switch.get(state, None)
534 if next_state is not None:
535 self.dts.handle.set_state(next_state)