3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
24 import rift
.mano
.cloud
25 from rift
.mano
.utils
.project
import (
27 ProjectConfigCallbacks
,
29 get_add_delete_update_cfgs
,
33 from . import glance_proxy_server
34 from . import glance_client
38 gi
.require_version('RwImageMgmtYang', '1.0')
39 gi
.require_version('RwLaunchpadYang', '1.0')
40 gi
.require_version('RwDts', '1.0')
42 from gi
.repository
import (
50 class ImageRequestError(Exception):
54 class AccountNotFoundError(ImageRequestError
):
58 class ImageNotFoundError(ImageRequestError
):
62 class CloudAccountDtsHandler(object):
63 def __init__(self
, log
, dts
, log_hdl
, project
):
66 self
._log
_hdl
= log_hdl
67 self
._cloud
_cfg
_subscriber
= None
68 self
._project
= project
71 def register(self
, on_add_apply
, on_delete_apply
):
72 self
._log
.debug("Project {}: creating cloud account config handler".
73 format(self
._project
.name
))
74 self
._cloud
_cfg
_subscriber
= rift
.mano
.cloud
.CloudAccountConfigSubscriber(
75 self
._dts
, self
._log
, self
._log
_hdl
, self
._project
,
76 rift
.mano
.cloud
.CloudAccountConfigCallbacks(
77 on_add_apply
=on_add_apply
,
78 on_delete_apply
=on_delete_apply
,
81 yield from self
._cloud
_cfg
_subscriber
.register()
84 self
._log
.debug("Project {}: Removing cloud account config handler".
85 format(self
._project
.name
))
86 self
._cloud
_cfg
_subscriber
.deregister()
89 def openstack_image_to_image_info(openstack_image
):
90 """Convert the OpenstackImage to a ImageInfo protobuf message
93 openstack_image - A OpenstackImage instance
96 A ImageInfo CAL protobuf message
99 image_info
= RwcalYang
.YangData_RwProject_Project_VimResources_ImageinfoList()
101 copy_fields
= ["id", "name", "checksum", "container_format", "disk_format"]
102 for field
in copy_fields
:
103 value
= getattr(openstack_image
, field
)
104 setattr(image_info
, field
, value
)
106 value
= getattr(openstack_image
, "properties")
108 prop
= image_info
.properties
.add()
110 prop
.property_value
= value
[key
]
112 image_info
.state
= openstack_image
.status
117 class ImageDTSShowHandler(object):
118 """ A DTS publisher for the upload-jobs data container """
119 def __init__(self
, project
, job_controller
):
120 self
._log
= project
.log
121 self
._loop
= project
.loop
122 self
._dts
= project
.dts
123 self
._job
_controller
= job_controller
124 self
._project
= project
126 self
._subscriber
= None
129 return self
._project
.add_project("D,/rw-image-mgmt:upload-jobs")
133 """ Register as a publisher and wait for reg_ready to complete """
136 def on_prepare(xact_info
, action
, ks_path
, msg
):
137 if action
!= rwdts
.QueryAction
.READ
:
138 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
141 jobs_pb_msg
= self
._job
_controller
.pb_msg
143 xact_info
.respond_xpath(
144 rwdts
.XactRspCode
.ACK
,
145 xpath
=self
.get_xpath(),
149 reg_event
= asyncio
.Event(loop
=self
._loop
)
152 def on_ready(regh
, status
):
155 self
._subscriber
= yield from self
._dts
.register(
156 xpath
=self
.get_xpath(),
157 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
158 on_prepare
=on_prepare
,
161 flags
=rwdts
.Flag
.PUBLISHER
,
164 yield from reg_event
.wait()
167 def deregister(self
):
168 self
._log
.debug("Project {}: De-register show image handler".
169 format(self
._project
.name
))
171 self
._subscriber
.delete_element(self
.get_xpath())
172 self
._subscriber
.deregister()
173 self
._subscriber
= None
175 class ImageDTSRPCHandler(object):
176 """ A DTS publisher for the upload-job RPC's """
177 def __init__(self
, project
, glance_client
, upload_task_creator
, job_controller
):
178 self
._log
= project
.log
179 self
._loop
= project
.loop
180 self
._dts
= project
.dts
181 self
._glance
_client
= glance_client
182 self
._upload
_task
_creator
= upload_task_creator
183 self
._job
_controller
= job_controller
184 self
._project
= project
191 return self
._project
.cloud_accounts
194 def _register_create_upload_job(self
):
196 return "/rw-image-mgmt:create-upload-job"
199 def on_prepare(xact_info
, action
, ks_path
, msg
):
202 account_names
= create_msg
.cloud_account
204 self
._log
.debug("Create upload job msg: {} ".format(msg
.as_dict()))
206 if not self
._project
.rpc_check(msg
, xact_info
):
209 # If cloud accounts were not specified, upload image to all cloud account
210 if not account_names
:
211 account_names
= list(self
.accounts
.keys())
214 for account_name
in account_names
:
215 if account_name
not in self
.accounts
:
216 raise AccountNotFoundError("Could not find account %s", account_name
)
218 if create_msg
.has_field("external_url"):
219 glance_image
= yield from self
._upload
_task
_creator
.create_glance_image_from_url_create_rpc(
220 account_names
, create_msg
.external_url
223 tasks
= yield from self
._upload
_task
_creator
.create_tasks_from_glance_id(
224 account_names
, glance_image
.id
227 def delete_image(ft
):
229 self
._glance
_client
.delete_image_from_id(glance_image
.id)
230 except glance_client
.OpenstackImageDeleteError
:
233 # Create a job and when the job completes delete the temporary
234 # image from the catalog.
235 job_id
= self
._job
_controller
.create_job(
237 on_completed
=delete_image
240 elif create_msg
.has_field("onboarded_image"):
241 self
._log
.debug("onboarded_image {} to accounts {}".
242 format(create_msg
.onboarded_image
, account_names
))
243 tasks
= yield from self
._upload
_task
_creator
.create_tasks_from_onboarded_create_rpc(
244 account_names
, create_msg
.onboarded_image
246 job_id
= self
._job
_controller
.create_job(tasks
)
249 raise ImageRequestError("an image selection must be provided")
251 rpc_out_msg
= RwImageMgmtYang
.YangOutput_RwImageMgmt_CreateUploadJob(job_id
=job_id
)
253 xact_info
.respond_xpath(
254 rwdts
.XactRspCode
.ACK
,
255 xpath
="O," + get_xpath(),
259 reg_event
= asyncio
.Event(loop
=self
._loop
)
262 def on_ready(_
, status
):
265 self
._create
= yield from self
._dts
.register(
266 xpath
="I," + get_xpath(),
267 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
268 on_prepare
=on_prepare
,
271 flags
=rwdts
.Flag
.PUBLISHER
,
274 yield from reg_event
.wait()
277 def _register_cancel_upload_job(self
):
279 return "/rw-image-mgmt:cancel-upload-job"
282 def on_prepare(xact_info
, action
, ks_path
, msg
):
283 if not self
._project
.rpc_check(msg
, xact_info
):
286 if not msg
.has_field("job_id"):
287 self
._log
.error("cancel-upload-job missing job-id field.")
288 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
293 job
= self
._job
_controller
.get_job(job_id
)
296 xact_info
.respond_xpath(
297 rwdts
.XactRspCode
.ACK
,
298 xpath
="O," + get_xpath(),
301 reg_event
= asyncio
.Event(loop
=self
._loop
)
304 def on_ready(_
, status
):
307 self
._cancel
= yield from self
._dts
.register(
308 xpath
="I," + get_xpath(),
309 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
310 on_prepare
=on_prepare
,
313 flags
=rwdts
.Flag
.PUBLISHER
,
316 yield from reg_event
.wait()
320 """ Register for RPC's and wait for all registrations to complete """
321 yield from self
._register
_create
_upload
_job
()
322 yield from self
._register
_cancel
_upload
_job
()
324 def deregister(self
):
325 self
._log
.debug("Project {}: Deregister image rpc handlers".
326 format(self
._project
.name
))
328 self
._create
.deregister()
332 self
._cancel
.deregister()
336 class GlanceClientUploadTaskCreator(object):
337 """ This class creates upload tasks using configured cloud accounts and
338 configured image catalog glance client """
340 def __init__(self
, project
, glance_client
):
341 self
._log
= project
.log
342 self
._loop
= project
.loop
343 self
._glance
_client
= glance_client
344 self
._project
= project
348 return self
._project
.cloud_accounts
351 def create_tasks(self
, account_names
, image_id
=None, image_name
=None, image_checksum
=None):
352 """ Create a list of UploadTasks for a list of cloud accounts
353 and a image with a matching image_name and image_checksum in the
357 account_names - A list of configured cloud account names
358 image_id - A image id
359 image_name - A image name
360 image_checksum - A image checksum
363 A list of AccountImageUploadTask instances
366 ImageNotFoundError - Could not find a matching image in the
369 AccountNotFoundError - Could not find an account that matched
370 the provided account name
373 image
= yield from asyncio
.wait_for(
374 self
._loop
.run_in_executor(
376 self
._glance
_client
.find_active_image
,
385 except glance_client
.OpenstackImageError
as e
:
386 msg
= "Could not find image in Openstack to upload"
387 self
._log
.exception(msg
)
388 raise ImageNotFoundError(msg
) from e
390 image_info
= openstack_image_to_image_info(image
)
391 self
._log
.debug("created image info: %s", image_info
)
394 for account_name
in account_names
:
395 if account_name
not in self
.accounts
:
396 raise AccountNotFoundError("Could not find account %s", account_name
)
398 # For each account name provided, create a pipe (GlanceImagePipeGen)
399 # which feeds data into the UploadTask while also monitoring the various
400 # transmit stats (progress, bytes written, bytes per second, etc)
401 for account_name
in account_names
:
402 account
= self
.accounts
[account_name
]
403 self
._log
.debug("creating task for account %s", account
.name
)
404 glance_data_gen
= self
._glance
_client
.get_image_data(image_info
.id)
406 pipe_gen
= upload
.GlanceImagePipeGen(self
._log
, self
._loop
, glance_data_gen
)
407 progress_pipe
= upload
.UploadProgressWriteProxy(
408 self
._log
, self
._loop
, image
.size
, pipe_gen
.write_hdl
410 progress_pipe
.start_rate_monitoring()
411 pipe_gen
.write_hdl
= progress_pipe
414 task
= upload
.AccountImageUploadTask(
415 self
._log
, self
._loop
, account
, image_info
, pipe_gen
.read_hdl
,
416 progress_info
=progress_pipe
, write_canceller
=pipe_gen
,
419 self
._log
.debug("task created: %s", task
)
424 def create_glance_image_from_url_create_rpc(self
, account_names
, create_msg
):
425 if "image_url" not in create_msg
:
426 raise ValueError("image_url must be specified")
428 if "image_id" in create_msg
:
429 raise ImageRequestError("Cannot specify both image_url and image_id")
431 if "image_name" not in create_msg
:
432 raise ImageRequestError("image_name must be specified when image_url is provided")
434 glance_image
= yield from asyncio
.wait_for(
435 self
._loop
.run_in_executor(
437 self
._glance
_client
.create_image_from_url
,
438 create_msg
.image_url
,
439 create_msg
.image_name
,
440 create_msg
.image_checksum
if "image_checksum" in create_msg
else None,
441 create_msg
.disk_format
if "disk_format" in create_msg
else None,
442 create_msg
.container_format
if "container_format" in create_msg
else None,
451 def create_tasks_from_glance_id(self
, account_names
, glance_image_id
):
452 return (yield from self
.create_tasks(account_names
, glance_image_id
))
455 def create_tasks_from_onboarded_create_rpc(self
, account_names
, create_msg
):
456 return (yield from self
.create_tasks(
458 create_msg
.image_id
if "image_id" in create_msg
else None,
459 create_msg
.image_name
if "image_name" in create_msg
else None,
460 create_msg
.image_checksum
if "image_checksum" in create_msg
else None)
463 class ImageMgrProject(ManoProject
):
465 def __init__(self
, name
, tasklet
, **kw
):
466 super(ImageMgrProject
, self
).__init
__(tasklet
.log
, name
)
469 self
.glance_client
= kw
['client']
470 except KeyError as e
:
471 self
._log
.exception("kw {}: {}".format(kw
, e
))
473 self
.cloud_cfg_subscriber
= None
474 self
.job_controller
= None
475 self
.task_creator
= None
476 self
.rpc_handler
= None
477 self
.show_handler
= None
479 self
.cloud_accounts
= {}
484 self
.log
.debug("creating cloud account handler")
485 self
.cloud_cfg_subscriber
= CloudAccountDtsHandler(self
._log
,
489 yield from self
.cloud_cfg_subscriber
.register(
490 self
.on_cloud_account_create
,
491 self
.on_cloud_account_delete
494 self
.job_controller
= upload
.ImageUploadJobController(
498 self
.task_creator
= GlanceClientUploadTaskCreator(
499 self
, self
.glance_client
,
502 self
.rpc_handler
= ImageDTSRPCHandler(
503 self
, self
.glance_client
, self
.task_creator
,
506 yield from self
.rpc_handler
.register()
508 self
.show_handler
= ImageDTSShowHandler(
509 self
, self
.job_controller
,
511 yield from self
.show_handler
.register()
512 except Exception as e
:
513 self
.log
.exception("Error during project {} register: e".
514 format(self
.name
, e
))
516 def deregister(self
):
517 self
.log
.debug("De-register handlers for project: {}".format(self
.name
))
518 self
.rpc_handler
.deregister()
519 self
.show_handler
.deregister()
520 self
.cloud_cfg_subscriber
.deregister()
522 def on_cloud_account_create(self
, account
):
523 self
.log
.debug("adding cloud account: %s", account
.name
)
524 self
.cloud_accounts
[account
.name
] = account
526 def on_cloud_account_delete(self
, account_name
):
527 self
.log
.debug("deleting cloud account: %s", account_name
)
528 if account_name
not in self
.cloud_accounts
:
529 self
.log
.warning("cloud account not found: %s", account_name
)
531 del self
.cloud_accounts
[account_name
]
533 class ImageManagerTasklet(rift
.tasklets
.Tasklet
):
535 The RwImageMgrTasklet provides a interface for DTS to interact with an
536 instance of the Monitor class. This allows the Monitor class to remain
540 def __init__(self
, *args
, **kwargs
):
541 super().__init
__(*args
, **kwargs
)
542 self
.rwlog
.set_category("rw-mano-log")
544 self
.http_proxy
= None
545 self
.proxy_server
= None
547 self
.glance_client
= None
548 self
.project_handler
= None
554 self
.log
.info("Starting Image Manager Tasklet")
556 self
.log
.debug("Registering with dts")
557 self
.dts
= rift
.tasklets
.DTS(
559 RwImageMgmtYang
.get_schema(),
561 self
.on_dts_state_change
564 self
.log
.debug("Created DTS Api GI Object: %s", self
.dts
)
569 except Exception as e
:
570 self
.log
.exception(e
)
575 self
.log
.debug("creating http proxy server")
577 self
.http_proxy
= glance_proxy_server
.QuickProxyServer(self
.log
, self
.loop
)
579 self
.proxy_server
= glance_proxy_server
.GlanceHTTPProxyServer(
580 self
.log
, self
.loop
, self
.http_proxy
582 self
.proxy_server
.start()
584 self
.glance_client
= glance_client
.OpenstackGlanceClient
.from_token(
585 self
.log
, "127.0.0.1", "9292", "test"
588 self
.log
.debug("Creating project handler")
589 self
.project_handler
= ProjectHandler(self
, ImageMgrProject
,
590 client
=self
.glance_client
)
591 self
.project_handler
.register()
593 except Exception as e
:
594 self
.log
.exception("error during init")
600 def on_instance_started(self
):
601 self
.log
.debug("Got instance started callback")
604 def on_dts_state_change(self
, state
):
605 """Handle DTS state change
607 Take action according to current DTS state to transition application
608 into the corresponding application state
611 state - current dts state
615 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
616 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
620 rwdts
.State
.INIT
: self
.init
,
621 rwdts
.State
.RUN
: self
.run
,
624 # Transition application to next state
625 handler
= handlers
.get(state
, None)
626 if handler
is not None:
629 # Transition dts to next state
630 next_state
= switch
.get(state
, None)
631 if next_state
is not None:
632 self
.dts
.handle
.set_state(next_state
)