update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / tasklets / rwimagemgr / tasklet.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import os
20 import threading
21 import time
22
23 import rift.tasklets
24 import rift.mano.cloud
25 from rift.mano.utils.project import (
26 ManoProject,
27 ProjectConfigCallbacks,
28 ProjectHandler,
29 get_add_delete_update_cfgs,
30 DEFAULT_PROJECT,
31 )
32
33 from . import glance_proxy_server
34 from . import glance_client
35 from . import upload
36
37 import gi
38 gi.require_version('RwImageMgmtYang', '1.0')
39 gi.require_version('RwLaunchpadYang', '1.0')
40 gi.require_version('RwDts', '1.0')
41
42 from gi.repository import (
43 RwcalYang,
44 RwDts as rwdts,
45 RwImageMgmtYang,
46 RwLaunchpadYang,
47 )
48
49
50 class ImageRequestError(Exception):
51 pass
52
53
54 class AccountNotFoundError(ImageRequestError):
55 pass
56
57
58 class ImageNotFoundError(ImageRequestError):
59 pass
60
61
62 class CloudAccountDtsHandler(object):
63 def __init__(self, log, dts, log_hdl, project):
64 self._dts = dts
65 self._log = log
66 self._log_hdl = log_hdl
67 self._cloud_cfg_subscriber = None
68 self._project = project
69
70 @asyncio.coroutine
71 def register(self, on_add_apply, on_delete_apply):
72 self._log.debug("Project {}: creating cloud account config handler".
73 format(self._project.name))
74 self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber(
75 self._dts, self._log, self._log_hdl, self._project,
76 rift.mano.cloud.CloudAccountConfigCallbacks(
77 on_add_apply=on_add_apply,
78 on_delete_apply=on_delete_apply,
79 )
80 )
81 yield from self._cloud_cfg_subscriber.register()
82
83 def deregister(self):
84 self._log.debug("Project {}: Removing cloud account config handler".
85 format(self._project.name))
86 self._cloud_cfg_subscriber.deregister()
87
88
89 def openstack_image_to_image_info(openstack_image):
90 """Convert the OpenstackImage to a ImageInfo protobuf message
91
92 Arguments:
93 openstack_image - A OpenstackImage instance
94
95 Returns:
96 A ImageInfo CAL protobuf message
97 """
98
99 image_info = RwcalYang.YangData_RwProject_Project_VimResources_ImageinfoList()
100
101 copy_fields = ["id", "name", "checksum", "container_format", "disk_format"]
102 for field in copy_fields:
103 value = getattr(openstack_image, field)
104 setattr(image_info, field, value)
105
106 value = getattr(openstack_image, "properties")
107 for key in value:
108 prop = image_info.properties.add()
109 prop.name = key
110 prop.property_value = value[key]
111
112 image_info.state = openstack_image.status
113
114 return image_info
115
116
117 class ImageDTSShowHandler(object):
118 """ A DTS publisher for the upload-jobs data container """
119 def __init__(self, project, job_controller):
120 self._log = project.log
121 self._loop = project.loop
122 self._dts = project.dts
123 self._job_controller = job_controller
124 self._project = project
125
126 self._subscriber = None
127
128 def get_xpath(self):
129 return self._project.add_project("D,/rw-image-mgmt:upload-jobs")
130
131 @asyncio.coroutine
132 def register(self):
133 """ Register as a publisher and wait for reg_ready to complete """
134
135 @asyncio.coroutine
136 def on_prepare(xact_info, action, ks_path, msg):
137 if action != rwdts.QueryAction.READ:
138 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
139 return
140
141 jobs_pb_msg = self._job_controller.pb_msg
142
143 xact_info.respond_xpath(
144 rwdts.XactRspCode.ACK,
145 xpath=self.get_xpath(),
146 msg=jobs_pb_msg,
147 )
148
149 reg_event = asyncio.Event(loop=self._loop)
150
151 @asyncio.coroutine
152 def on_ready(regh, status):
153 reg_event.set()
154
155 self._subscriber = yield from self._dts.register(
156 xpath=self.get_xpath(),
157 handler=rift.tasklets.DTS.RegistrationHandler(
158 on_prepare=on_prepare,
159 on_ready=on_ready,
160 ),
161 flags=rwdts.Flag.PUBLISHER,
162 )
163
164 yield from reg_event.wait()
165
166
167 def deregister(self):
168 self._log.debug("Project {}: De-register show image handler".
169 format(self._project.name))
170 if self._subscriber:
171 self._subscriber.delete_element(self.get_xpath())
172 self._subscriber.deregister()
173 self._subscriber = None
174
175 class ImageDTSRPCHandler(object):
176 """ A DTS publisher for the upload-job RPC's """
177 def __init__(self, project, glance_client, upload_task_creator, job_controller):
178 self._log = project.log
179 self._loop = project.loop
180 self._dts = project.dts
181 self._glance_client = glance_client
182 self._upload_task_creator = upload_task_creator
183 self._job_controller = job_controller
184 self._project = project
185
186 self._create = None
187 self._cancel = None
188
189 @property
190 def accounts(self):
191 return self._project.cloud_accounts
192
193 @asyncio.coroutine
194 def _register_create_upload_job(self):
195 def get_xpath():
196 return "/rw-image-mgmt:create-upload-job"
197
198 @asyncio.coroutine
199 def on_prepare(xact_info, action, ks_path, msg):
200 create_msg = msg
201
202 account_names = create_msg.cloud_account
203
204 self._log.debug("Create upload job msg: {} ".format(msg.as_dict()))
205
206 if not self._project.rpc_check(msg, xact_info):
207 return
208
209 # If cloud accounts were not specified, upload image to all cloud account
210 if not account_names:
211 account_names = list(self.accounts.keys())
212
213 else:
214 for account_name in account_names:
215 if account_name not in self.accounts:
216 raise AccountNotFoundError("Could not find account %s", account_name)
217
218 if create_msg.has_field("external_url"):
219 glance_image = yield from self._upload_task_creator.create_glance_image_from_url_create_rpc(
220 account_names, create_msg.external_url
221 )
222
223 tasks = yield from self._upload_task_creator.create_tasks_from_glance_id(
224 account_names, glance_image.id
225 )
226
227 def delete_image(ft):
228 try:
229 self._glance_client.delete_image_from_id(glance_image.id)
230 except glance_client.OpenstackImageDeleteError:
231 pass
232
233 # Create a job and when the job completes delete the temporary
234 # image from the catalog.
235 job_id = self._job_controller.create_job(
236 tasks,
237 on_completed=delete_image
238 )
239
240 elif create_msg.has_field("onboarded_image"):
241 self._log.debug("onboarded_image {} to accounts {}".
242 format(create_msg.onboarded_image, account_names))
243 tasks = yield from self._upload_task_creator.create_tasks_from_onboarded_create_rpc(
244 account_names, create_msg.onboarded_image
245 )
246 job_id = self._job_controller.create_job(tasks)
247
248 else:
249 raise ImageRequestError("an image selection must be provided")
250
251 rpc_out_msg = RwImageMgmtYang.YangOutput_RwImageMgmt_CreateUploadJob(job_id=job_id)
252
253 xact_info.respond_xpath(
254 rwdts.XactRspCode.ACK,
255 xpath="O," + get_xpath(),
256 msg=rpc_out_msg,
257 )
258
259 reg_event = asyncio.Event(loop=self._loop)
260
261 @asyncio.coroutine
262 def on_ready(_, status):
263 reg_event.set()
264
265 self._create = yield from self._dts.register(
266 xpath="I," + get_xpath(),
267 handler=rift.tasklets.DTS.RegistrationHandler(
268 on_prepare=on_prepare,
269 on_ready=on_ready,
270 ),
271 flags=rwdts.Flag.PUBLISHER,
272 )
273
274 yield from reg_event.wait()
275
276 @asyncio.coroutine
277 def _register_cancel_upload_job(self):
278 def get_xpath():
279 return "/rw-image-mgmt:cancel-upload-job"
280
281 @asyncio.coroutine
282 def on_prepare(xact_info, action, ks_path, msg):
283 if not self._project.rpc_check(msg, xact_info):
284 return
285
286 if not msg.has_field("job_id"):
287 self._log.error("cancel-upload-job missing job-id field.")
288 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
289 return
290
291 job_id = msg.job_id
292
293 job = self._job_controller.get_job(job_id)
294 job.stop()
295
296 xact_info.respond_xpath(
297 rwdts.XactRspCode.ACK,
298 xpath="O," + get_xpath(),
299 )
300
301 reg_event = asyncio.Event(loop=self._loop)
302
303 @asyncio.coroutine
304 def on_ready(_, status):
305 reg_event.set()
306
307 self._cancel = yield from self._dts.register(
308 xpath="I," + get_xpath(),
309 handler=rift.tasklets.DTS.RegistrationHandler(
310 on_prepare=on_prepare,
311 on_ready=on_ready,
312 ),
313 flags=rwdts.Flag.PUBLISHER,
314 )
315
316 yield from reg_event.wait()
317
318 @asyncio.coroutine
319 def register(self):
320 """ Register for RPC's and wait for all registrations to complete """
321 yield from self._register_create_upload_job()
322 yield from self._register_cancel_upload_job()
323
324 def deregister(self):
325 self._log.debug("Project {}: Deregister image rpc handlers".
326 format(self._project.name))
327 if self._create:
328 self._create.deregister()
329 self._create = None
330
331 if self._cancel:
332 self._cancel.deregister()
333 self._cancel = None
334
335
336 class GlanceClientUploadTaskCreator(object):
337 """ This class creates upload tasks using configured cloud accounts and
338 configured image catalog glance client """
339
340 def __init__(self, project, glance_client):
341 self._log = project.log
342 self._loop = project.loop
343 self._glance_client = glance_client
344 self._project = project
345
346 @property
347 def accounts(self):
348 return self._project.cloud_accounts
349
350 @asyncio.coroutine
351 def create_tasks(self, account_names, image_id=None, image_name=None, image_checksum=None):
352 """ Create a list of UploadTasks for a list of cloud accounts
353 and a image with a matching image_name and image_checksum in the
354 catalog
355
356 Arguments:
357 account_names - A list of configured cloud account names
358 image_id - A image id
359 image_name - A image name
360 image_checksum - A image checksum
361
362 Returns:
363 A list of AccountImageUploadTask instances
364
365 Raises:
366 ImageNotFoundError - Could not find a matching image in the
367 image catalog
368
369 AccountNotFoundError - Could not find an account that matched
370 the provided account name
371 """
372 try:
373 image = yield from asyncio.wait_for(
374 self._loop.run_in_executor(
375 None,
376 self._glance_client.find_active_image,
377 image_id,
378 image_name,
379 image_checksum,
380 ),
381 timeout=5,
382 loop=self._loop,
383 )
384
385 except glance_client.OpenstackImageError as e:
386 msg = "Could not find image in Openstack to upload"
387 self._log.exception(msg)
388 raise ImageNotFoundError(msg) from e
389
390 image_info = openstack_image_to_image_info(image)
391 self._log.debug("created image info: %s", image_info)
392
393 tasks = []
394 for account_name in account_names:
395 if account_name not in self.accounts:
396 raise AccountNotFoundError("Could not find account %s", account_name)
397
398 # For each account name provided, create a pipe (GlanceImagePipeGen)
399 # which feeds data into the UploadTask while also monitoring the various
400 # transmit stats (progress, bytes written, bytes per second, etc)
401 for account_name in account_names:
402 account = self.accounts[account_name]
403 self._log.debug("creating task for account %s", account.name)
404 glance_data_gen = self._glance_client.get_image_data(image_info.id)
405
406 pipe_gen = upload.GlanceImagePipeGen(self._log, self._loop, glance_data_gen)
407 progress_pipe = upload.UploadProgressWriteProxy(
408 self._log, self._loop, image.size, pipe_gen.write_hdl
409 )
410 progress_pipe.start_rate_monitoring()
411 pipe_gen.write_hdl = progress_pipe
412 pipe_gen.start()
413
414 task = upload.AccountImageUploadTask(
415 self._log, self._loop, account, image_info, pipe_gen.read_hdl,
416 progress_info=progress_pipe, write_canceller=pipe_gen,
417 )
418 tasks.append(task)
419 self._log.debug("task created: %s", task)
420
421 return tasks
422
423 @asyncio.coroutine
424 def create_glance_image_from_url_create_rpc(self, account_names, create_msg):
425 if "image_url" not in create_msg:
426 raise ValueError("image_url must be specified")
427
428 if "image_id" in create_msg:
429 raise ImageRequestError("Cannot specify both image_url and image_id")
430
431 if "image_name" not in create_msg:
432 raise ImageRequestError("image_name must be specified when image_url is provided")
433
434 glance_image = yield from asyncio.wait_for(
435 self._loop.run_in_executor(
436 None,
437 self._glance_client.create_image_from_url,
438 create_msg.image_url,
439 create_msg.image_name,
440 create_msg.image_checksum if "image_checksum" in create_msg else None,
441 create_msg.disk_format if "disk_format" in create_msg else None,
442 create_msg.container_format if "container_format" in create_msg else None,
443 ),
444 timeout=5,
445 loop=self._loop,
446 )
447
448 return glance_image
449
450 @asyncio.coroutine
451 def create_tasks_from_glance_id(self, account_names, glance_image_id):
452 return (yield from self.create_tasks(account_names, glance_image_id))
453
454 @asyncio.coroutine
455 def create_tasks_from_onboarded_create_rpc(self, account_names, create_msg):
456 return (yield from self.create_tasks(
457 account_names,
458 create_msg.image_id if "image_id" in create_msg else None,
459 create_msg.image_name if "image_name" in create_msg else None,
460 create_msg.image_checksum if "image_checksum" in create_msg else None)
461 )
462
463 class ImageMgrProject(ManoProject):
464
465 def __init__(self, name, tasklet, **kw):
466 super(ImageMgrProject, self).__init__(tasklet.log, name)
467 self.update(tasklet)
468 try:
469 self.glance_client = kw['client']
470 except KeyError as e:
471 self._log.exception("kw {}: {}".format(kw, e))
472
473 self.cloud_cfg_subscriber = None
474 self.job_controller = None
475 self.task_creator = None
476 self.rpc_handler = None
477 self.show_handler = None
478
479 self.cloud_accounts = {}
480
481 @asyncio.coroutine
482 def register(self):
483 try:
484 self.log.debug("creating cloud account handler")
485 self.cloud_cfg_subscriber = CloudAccountDtsHandler(self._log,
486 self._dts,
487 self._log_hdl,
488 self)
489 yield from self.cloud_cfg_subscriber.register(
490 self.on_cloud_account_create,
491 self.on_cloud_account_delete
492 )
493
494 self.job_controller = upload.ImageUploadJobController(
495 self
496 )
497
498 self.task_creator = GlanceClientUploadTaskCreator(
499 self, self.glance_client,
500 )
501
502 self.rpc_handler = ImageDTSRPCHandler(
503 self, self.glance_client, self.task_creator,
504 self.job_controller,
505 )
506 yield from self.rpc_handler.register()
507
508 self.show_handler = ImageDTSShowHandler(
509 self, self.job_controller,
510 )
511 yield from self.show_handler.register()
512 except Exception as e:
513 self.log.exception("Error during project {} register: e".
514 format(self.name, e))
515
516 def deregister(self):
517 self.log.debug("De-register handlers for project: {}".format(self.name))
518 self.rpc_handler.deregister()
519 self.show_handler.deregister()
520 self.cloud_cfg_subscriber.deregister()
521
522 def on_cloud_account_create(self, account):
523 self.log.debug("adding cloud account: %s", account.name)
524 self.cloud_accounts[account.name] = account
525
526 def on_cloud_account_delete(self, account_name):
527 self.log.debug("deleting cloud account: %s", account_name)
528 if account_name not in self.cloud_accounts:
529 self.log.warning("cloud account not found: %s", account_name)
530 else:
531 del self.cloud_accounts[account_name]
532
533 class ImageManagerTasklet(rift.tasklets.Tasklet):
534 """
535 The RwImageMgrTasklet provides a interface for DTS to interact with an
536 instance of the Monitor class. This allows the Monitor class to remain
537 independent of DTS.
538 """
539
540 def __init__(self, *args, **kwargs):
541 super().__init__(*args, **kwargs)
542 self.rwlog.set_category("rw-mano-log")
543
544 self.http_proxy = None
545 self.proxy_server = None
546 self.dts = None
547 self.glance_client = None
548 self.project_handler = None
549
550 self.projects = {}
551
552 def start(self):
553 super().start()
554 self.log.info("Starting Image Manager Tasklet")
555
556 self.log.debug("Registering with dts")
557 self.dts = rift.tasklets.DTS(
558 self.tasklet_info,
559 RwImageMgmtYang.get_schema(),
560 self.loop,
561 self.on_dts_state_change
562 )
563
564 self.log.debug("Created DTS Api GI Object: %s", self.dts)
565
566 def stop(self):
567 try:
568 self.dts.deinit()
569 except Exception as e:
570 self.log.exception(e)
571
572 @asyncio.coroutine
573 def init(self):
574 try:
575 self.log.debug("creating http proxy server")
576
577 self.http_proxy = glance_proxy_server.QuickProxyServer(self.log, self.loop)
578
579 self.proxy_server = glance_proxy_server.GlanceHTTPProxyServer(
580 self.log, self.loop, self.http_proxy
581 )
582 self.proxy_server.start()
583
584 self.glance_client = glance_client.OpenstackGlanceClient.from_token(
585 self.log, "127.0.0.1", "9292", "test"
586 )
587
588 self.log.debug("Creating project handler")
589 self.project_handler = ProjectHandler(self, ImageMgrProject,
590 client=self.glance_client)
591 self.project_handler.register()
592
593 except Exception as e:
594 self.log.exception("error during init")
595
596 @asyncio.coroutine
597 def run(self):
598 pass
599
600 def on_instance_started(self):
601 self.log.debug("Got instance started callback")
602
603 @asyncio.coroutine
604 def on_dts_state_change(self, state):
605 """Handle DTS state change
606
607 Take action according to current DTS state to transition application
608 into the corresponding application state
609
610 Arguments
611 state - current dts state
612
613 """
614 switch = {
615 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
616 rwdts.State.CONFIG: rwdts.State.RUN,
617 }
618
619 handlers = {
620 rwdts.State.INIT: self.init,
621 rwdts.State.RUN: self.run,
622 }
623
624 # Transition application to next state
625 handler = handlers.get(state, None)
626 if handler is not None:
627 yield from handler()
628
629 # Transition dts to next state
630 next_state = switch.get(state, None)
631 if next_state is not None:
632 self.dts.handle.set_state(next_state)