Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / tasklets / rwimagemgr / tasklet.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import os
20 import threading
21 import time
22
23 import rift.tasklets
24 import rift.mano.cloud
25 from rift.mano.utils.project import (
26 ManoProject,
27 ProjectConfigCallbacks,
28 ProjectHandler,
29 get_add_delete_update_cfgs,
30 DEFAULT_PROJECT,
31 )
32
33 from . import glance_proxy_server
34 from . import glance_client
35 from . import upload
36
37 import gi
38 gi.require_version('RwImageMgmtYang', '1.0')
39 gi.require_version('RwLaunchpadYang', '1.0')
40 gi.require_version('RwDts', '1.0')
41
42 from gi.repository import (
43 RwcalYang,
44 RwDts as rwdts,
45 RwImageMgmtYang,
46 RwLaunchpadYang,
47 )
48
49
50 class ImageRequestError(Exception):
51 pass
52
53
54 class AccountNotFoundError(ImageRequestError):
55 pass
56
57
58 class ImageNotFoundError(ImageRequestError):
59 pass
60
61
62 class CloudAccountDtsHandler(object):
63 def __init__(self, log, dts, log_hdl, project):
64 self._dts = dts
65 self._log = log
66 self._log_hdl = log_hdl
67 self._cloud_cfg_subscriber = None
68 self._project = project
69
70 def register(self, on_add_apply, on_delete_apply):
71 self._log.debug("Project {}: creating cloud account config handler".
72 format(self._project.name))
73 self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber(
74 self._dts, self._log, self._log_hdl, self._project,
75 rift.mano.cloud.CloudAccountConfigCallbacks(
76 on_add_apply=on_add_apply,
77 on_delete_apply=on_delete_apply,
78 )
79 )
80 self._cloud_cfg_subscriber.register()
81
82 def deregister(self):
83 self._log.debug("Project {}: Removing cloud account config handler".
84 format(self._project.name))
85 self._cloud_cfg_subscriber.deregister()
86
87
88 def openstack_image_to_image_info(openstack_image):
89 """Convert the OpenstackImage to a ImageInfo protobuf message
90
91 Arguments:
92 openstack_image - A OpenstackImage instance
93
94 Returns:
95 A ImageInfo CAL protobuf message
96 """
97
98 image_info = RwcalYang.ImageInfoItem()
99
100 copy_fields = ["id", "name", "checksum", "container_format", "disk_format"]
101 for field in copy_fields:
102 value = getattr(openstack_image, field)
103 setattr(image_info, field, value)
104
105 image_info.state = openstack_image.status
106
107 return image_info
108
109
110 class ImageDTSShowHandler(object):
111 """ A DTS publisher for the upload-jobs data container """
112 def __init__(self, log, loop, dts, job_controller, project):
113 self._log = log
114 self._loop = loop
115 self._dts = dts
116 self._job_controller = job_controller
117 self._project = project
118
119 self._subscriber = None
120
121 def get_xpath(self):
122 return self._project.add_project("D,/rw-image-mgmt:upload-jobs")
123
124 @asyncio.coroutine
125 def register(self):
126 """ Register as a publisher and wait for reg_ready to complete """
127
128 @asyncio.coroutine
129 def on_prepare(xact_info, action, ks_path, msg):
130 if action != rwdts.QueryAction.READ:
131 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
132 return
133
134 jobs_pb_msg = self._job_controller.pb_msg
135
136 xact_info.respond_xpath(
137 rwdts.XactRspCode.ACK,
138 xpath=self.get_xpath(),
139 msg=jobs_pb_msg,
140 )
141
142 reg_event = asyncio.Event(loop=self._loop)
143
144 @asyncio.coroutine
145 def on_ready(regh, status):
146 reg_event.set()
147
148 self._subscriber = yield from self._dts.register(
149 xpath=self.get_xpath(),
150 handler=rift.tasklets.DTS.RegistrationHandler(
151 on_prepare=on_prepare,
152 on_ready=on_ready,
153 ),
154 flags=rwdts.Flag.PUBLISHER,
155 )
156
157 yield from reg_event.wait()
158
159
160 def deregister(self):
161 self._log.debug("Project {}: De-register show image handler".
162 format(self._project.name))
163 if self._subscriber:
164 self._subscriber.delete_element(self.get_xpath())
165 self._subscriber.deregister()
166 self._subscriber = None
167
168 class ImageDTSRPCHandler(object):
169 """ A DTS publisher for the upload-job RPC's """
170 def __init__(self, log, loop, dts, accounts, glance_client,
171 upload_task_creator, job_controller, project):
172 self._log = log
173 self._loop = loop
174 self._dts = dts
175 self._accounts = accounts
176 self._glance_client = glance_client
177 self._upload_task_creator = upload_task_creator
178 self._job_controller = job_controller
179 self._project = project
180
181 self._create = None
182 self._cancel = None
183
184 @asyncio.coroutine
185 def _register_create_upload_job(self):
186 def get_xpath():
187 return "/rw-image-mgmt:create-upload-job"
188
189 @asyncio.coroutine
190 def on_prepare(xact_info, action, ks_path, msg):
191 create_msg = msg
192
193 account_names = create_msg.cloud_account
194
195 if not self._project.rpc_check(msg, xact_info):
196 return
197
198 # If cloud accounts were not specified, upload image to all cloud account
199 if not account_names:
200 account_names = list(self._accounts.keys())
201
202 for account_name in account_names:
203 if account_name not in self._accounts:
204 raise AccountNotFoundError("Could not find account %s", account_name)
205
206 if create_msg.has_field("external_url"):
207 glance_image = yield from self._upload_task_creator.create_glance_image_from_url_create_rpc(
208 account_names, create_msg.external_url
209 )
210
211 tasks = yield from self._upload_task_creator.create_tasks_from_glance_id(
212 account_names, glance_image.id
213 )
214
215 def delete_image(ft):
216 try:
217 self._glance_client.delete_image_from_id(glance_image.id)
218 except glance_client.OpenstackImageDeleteError:
219 pass
220
221 # Create a job and when the job completes delete the temporary
222 # image from the catalog.
223 job_id = self._job_controller.create_job(
224 tasks,
225 on_completed=delete_image
226 )
227
228 elif create_msg.has_field("onboarded_image"):
229 tasks = yield from self._upload_task_creator.create_tasks_from_onboarded_create_rpc(
230 account_names, create_msg.onboarded_image
231 )
232 job_id = self._job_controller.create_job(tasks)
233
234 else:
235 raise ImageRequestError("an image selection must be provided")
236
237 rpc_out_msg = RwImageMgmtYang.CreateUploadJobOutput(job_id=job_id)
238
239 xact_info.respond_xpath(
240 rwdts.XactRspCode.ACK,
241 xpath="O," + get_xpath(),
242 msg=rpc_out_msg,
243 )
244
245 reg_event = asyncio.Event(loop=self._loop)
246
247 @asyncio.coroutine
248 def on_ready(_, status):
249 reg_event.set()
250
251 self._create = yield from self._dts.register(
252 xpath="I," + get_xpath(),
253 handler=rift.tasklets.DTS.RegistrationHandler(
254 on_prepare=on_prepare,
255 on_ready=on_ready,
256 ),
257 flags=rwdts.Flag.PUBLISHER,
258 )
259
260 yield from reg_event.wait()
261
262 @asyncio.coroutine
263 def _register_cancel_upload_job(self):
264 def get_xpath():
265 return "/rw-image-mgmt:cancel-upload-job"
266
267 @asyncio.coroutine
268 def on_prepare(xact_info, action, ks_path, msg):
269 if not self._project.rpc_check(msg, xact_info):
270 return
271
272 if not msg.has_field("job_id"):
273 self._log.error("cancel-upload-job missing job-id field.")
274 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
275 return
276
277 job_id = msg.job_id
278
279 job = self._job_controller.get_job(job_id)
280 job.stop()
281
282 xact_info.respond_xpath(
283 rwdts.XactRspCode.ACK,
284 xpath="O," + get_xpath(),
285 )
286
287 reg_event = asyncio.Event(loop=self._loop)
288
289 @asyncio.coroutine
290 def on_ready(_, status):
291 reg_event.set()
292
293 self._cancel = yield from self._dts.register(
294 xpath="I," + get_xpath(),
295 handler=rift.tasklets.DTS.RegistrationHandler(
296 on_prepare=on_prepare,
297 on_ready=on_ready,
298 ),
299 flags=rwdts.Flag.PUBLISHER,
300 )
301
302 yield from reg_event.wait()
303
304 @asyncio.coroutine
305 def register(self):
306 """ Register for RPC's and wait for all registrations to complete """
307 yield from self._register_create_upload_job()
308 yield from self._register_cancel_upload_job()
309
310 def deregister(self):
311 self._log.debug("Project {}: Deregister image rpc handlers".
312 format(self._project.name))
313 if self._create:
314 self._create.deregister()
315 self._create = None
316
317 if self._cancel:
318 self._cancel.deregister()
319 self._cancel = None
320
321
322 class GlanceClientUploadTaskCreator(object):
323 """ This class creates upload tasks using configured cloud accounts and
324 configured image catalog glance client """
325
326 def __init__(self, log, loop, accounts, glance_client, project):
327 self._log = log
328 self._loop = loop
329 self._accounts = accounts
330 self._glance_client = glance_client
331 self._project = project
332
333 @asyncio.coroutine
334 def create_tasks(self, account_names, image_id=None, image_name=None, image_checksum=None):
335 """ Create a list of UploadTasks for a list of cloud accounts
336 and a image with a matching image_name and image_checksum in the
337 catalog
338
339 Arguments:
340 account_names - A list of configured cloud account names
341 image_id - A image id
342 image_name - A image name
343 image_checksum - A image checksum
344
345 Returns:
346 A list of AccountImageUploadTask instances
347
348 Raises:
349 ImageNotFoundError - Could not find a matching image in the
350 image catalog
351
352 AccountNotFoundError - Could not find an account that matched
353 the provided account name
354 """
355 try:
356 image = yield from asyncio.wait_for(
357 self._loop.run_in_executor(
358 None,
359 self._glance_client.find_active_image,
360 image_id,
361 image_name,
362 image_checksum,
363 ),
364 timeout=5,
365 loop=self._loop,
366 )
367
368 except glance_client.OpenstackImageError as e:
369 msg = "Could not find image in Openstack to upload"
370 self._log.exception(msg)
371 raise ImageNotFoundError(msg) from e
372
373 image_info = openstack_image_to_image_info(image)
374 self._log.debug("created image info: %s", image_info)
375
376 tasks = []
377 for account_name in account_names:
378 if account_name not in self._accounts:
379 raise AccountNotFoundError("Could not find account %s", account_name)
380
381 # For each account name provided, create a pipe (GlanceImagePipeGen)
382 # which feeds data into the UploadTask while also monitoring the various
383 # transmit stats (progress, bytes written, bytes per second, etc)
384 for account_name in account_names:
385 account = self._accounts[account_name]
386 self._log.debug("creating task for account %s", account.name)
387 glance_data_gen = self._glance_client.get_image_data(image_info.id)
388
389 pipe_gen = upload.GlanceImagePipeGen(self._log, self._loop, glance_data_gen)
390 progress_pipe = upload.UploadProgressWriteProxy(
391 self._log, self._loop, image.size, pipe_gen.write_hdl
392 )
393 progress_pipe.start_rate_monitoring()
394 pipe_gen.write_hdl = progress_pipe
395 pipe_gen.start()
396
397 task = upload.AccountImageUploadTask(
398 self._log, self._loop, account, image_info, pipe_gen.read_hdl,
399 progress_info=progress_pipe, write_canceller=pipe_gen,
400 )
401 tasks.append(task)
402 self._log.debug("task created: %s", task)
403
404 return tasks
405
406 @asyncio.coroutine
407 def create_glance_image_from_url_create_rpc(self, account_names, create_msg):
408 if "image_url" not in create_msg:
409 raise ValueError("image_url must be specified")
410
411 if "image_id" in create_msg:
412 raise ImageRequestError("Cannot specify both image_url and image_id")
413
414 if "image_name" not in create_msg:
415 raise ImageRequestError("image_name must be specified when image_url is provided")
416
417 glance_image = yield from asyncio.wait_for(
418 self._loop.run_in_executor(
419 None,
420 self._glance_client.create_image_from_url,
421 create_msg.image_url,
422 create_msg.image_name,
423 create_msg.image_checksum if "image_checksum" in create_msg else None,
424 create_msg.disk_format if "disk_format" in create_msg else None,
425 create_msg.container_format if "container_format" in create_msg else None,
426 ),
427 timeout=5,
428 loop=self._loop,
429 )
430
431 return glance_image
432
433 @asyncio.coroutine
434 def create_tasks_from_glance_id(self, account_names, glance_image_id):
435 return (yield from self.create_tasks(account_names, glance_image_id))
436
437 @asyncio.coroutine
438 def create_tasks_from_onboarded_create_rpc(self, account_names, create_msg):
439 return (yield from self.create_tasks(
440 account_names,
441 create_msg.image_id if "image_id" in create_msg else None,
442 create_msg.image_name if "image_name" in create_msg else None,
443 create_msg.image_checksum if "image_checksum" in create_msg else None)
444 )
445
446 class ImageMgrProject(ManoProject):
447
448 def __init__(self, name, tasklet, **kw):
449 super(ImageMgrProject, self).__init__(tasklet.log, name)
450 self.update(tasklet)
451 try:
452 self.glance_client = kw['client']
453 except KeyError as e:
454 self._log.exception("kw {}: {}".format(kw, e))
455
456 self.cloud_cfg_subscriber = None
457 self.job_controller = None
458 self.task_creator = None
459 self.rpc_handler = None
460 self.show_handler = None
461
462 self.cloud_accounts = {}
463
464 @asyncio.coroutine
465 def register(self):
466 try:
467 self.log.debug("creating cloud account handler")
468 self.cloud_cfg_subscriber = CloudAccountDtsHandler(self._log,
469 self._dts,
470 self._log_hdl,
471 self)
472 self.cloud_cfg_subscriber.register(
473 self.on_cloud_account_create,
474 self.on_cloud_account_delete
475 )
476
477 self.job_controller = upload.ImageUploadJobController(
478 self.log, self.loop, self
479 )
480
481 self.task_creator = GlanceClientUploadTaskCreator(
482 self.log, self.loop, self.cloud_accounts,
483 self.glance_client, self
484 )
485
486 self.rpc_handler = ImageDTSRPCHandler(
487 self.log, self.loop, self.dts, self.cloud_accounts,
488 self.glance_client, self.task_creator,
489 self.job_controller, self
490 )
491 yield from self.rpc_handler.register()
492
493 self.show_handler = ImageDTSShowHandler(
494 self.log, self.loop, self.dts, self.job_controller, self
495 )
496 yield from self.show_handler.register()
497 except Exception as e:
498 self.log.exception("Error during project {} register: e".
499 format(self.name, e))
500
501 def deregister(self):
502 self.log.debug("De-register handlers for project: {}".format(self.name))
503 self.rpc_handler.deregister()
504 self.show_handler.deregister()
505 self.cloud_cfg_subscriber.deregister()
506
507 def on_cloud_account_create(self, account):
508 self.log.debug("adding cloud account: %s", account.name)
509 self.cloud_accounts[account.name] = account
510
511 def on_cloud_account_delete(self, account_name):
512 self.log.debug("deleting cloud account: %s", account_name)
513 if account_name not in self.cloud_accounts:
514 self.log.warning("cloud account not found: %s", account_name)
515 else:
516 del self.cloud_accounts[account_name]
517
518
519 class ImageManagerTasklet(rift.tasklets.Tasklet):
520 """
521 The RwImageMgrTasklet provides a interface for DTS to interact with an
522 instance of the Monitor class. This allows the Monitor class to remain
523 independent of DTS.
524 """
525
526 def __init__(self, *args, **kwargs):
527 super().__init__(*args, **kwargs)
528 self.rwlog.set_category("rw-mano-log")
529
530 self.http_proxy = None
531 self.proxy_server = None
532 self.dts = None
533 self.glance_client = None
534 self.project_handler = None
535
536 self.projects = {}
537
538 def start(self):
539 super().start()
540 self.log.info("Starting Image Manager Tasklet")
541
542 self.log.debug("Registering with dts")
543 self.dts = rift.tasklets.DTS(
544 self.tasklet_info,
545 RwImageMgmtYang.get_schema(),
546 self.loop,
547 self.on_dts_state_change
548 )
549
550 self.log.debug("Created DTS Api GI Object: %s", self.dts)
551
552 def stop(self):
553 try:
554 self.dts.deinit()
555 except Exception as e:
556 self.log.exception(e)
557
558 @asyncio.coroutine
559 def init(self):
560 try:
561 self.log.debug("creating http proxy server")
562
563 self.http_proxy = glance_proxy_server.QuickProxyServer(self.log, self.loop)
564
565 self.proxy_server = glance_proxy_server.GlanceHTTPProxyServer(
566 self.log, self.loop, self.http_proxy
567 )
568 self.proxy_server.start()
569
570 self.glance_client = glance_client.OpenstackGlanceClient.from_token(
571 self.log, "127.0.0.1", "9292", "test"
572 )
573
574 self.log.debug("Creating project handler")
575 self.project_handler = ProjectHandler(self, ImageMgrProject,
576 client=self.glance_client)
577 self.project_handler.register()
578
579 except Exception as e:
580 self.log.exception("error during init")
581
582 @asyncio.coroutine
583 def run(self):
584 pass
585
586 def on_instance_started(self):
587 self.log.debug("Got instance started callback")
588
589 @asyncio.coroutine
590 def on_dts_state_change(self, state):
591 """Handle DTS state change
592
593 Take action according to current DTS state to transition application
594 into the corresponding application state
595
596 Arguments
597 state - current dts state
598
599 """
600 switch = {
601 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
602 rwdts.State.CONFIG: rwdts.State.RUN,
603 }
604
605 handlers = {
606 rwdts.State.INIT: self.init,
607 rwdts.State.RUN: self.run,
608 }
609
610 # Transition application to next state
611 handler = handlers.get(state, None)
612 if handler is not None:
613 yield from handler()
614
615 # Transition dts to next state
616 next_state = switch.get(state, None)
617 if next_state is not None:
618 self.dts.handle.set_state(next_state)