3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
24 import rift
.mano
.cloud
26 from . import glance_proxy_server
27 from . import glance_client
31 gi
.require_version('RwImageMgmtYang', '1.0')
32 gi
.require_version('RwLaunchpadYang', '1.0')
33 gi
.require_version('RwDts', '1.0')
35 from gi
.repository
import (
43 class ImageRequestError(Exception):
47 class AccountNotFoundError(ImageRequestError
):
51 class ImageNotFoundError(ImageRequestError
):
55 class CloudAccountDtsHandler(object):
56 def __init__(self
, log
, dts
, log_hdl
):
59 self
._log
_hdl
= log_hdl
60 self
._cloud
_cfg
_subscriber
= None
62 def register(self
, on_add_apply
, on_delete_apply
):
63 self
._log
.debug("creating cloud account config handler")
64 self
._cloud
_cfg
_subscriber
= rift
.mano
.cloud
.CloudAccountConfigSubscriber(
65 self
._dts
, self
._log
, self
._log
_hdl
,
66 rift
.mano
.cloud
.CloudAccountConfigCallbacks(
67 on_add_apply
=on_add_apply
,
68 on_delete_apply
=on_delete_apply
,
71 self
._cloud
_cfg
_subscriber
.register()
74 def openstack_image_to_image_info(openstack_image
):
75 """Convert the OpenstackImage to a ImageInfo protobuf message
78 openstack_image - A OpenstackImage instance
81 A ImageInfo CAL protobuf message
84 image_info
= RwcalYang
.ImageInfoItem()
86 copy_fields
= ["id", "name", "checksum", "container_format", "disk_format"]
87 for field
in copy_fields
:
88 value
= getattr(openstack_image
, field
)
89 setattr(image_info
, field
, value
)
91 image_info
.state
= openstack_image
.status
96 class ImageDTSShowHandler(object):
97 """ A DTS publisher for the upload-jobs data container """
98 def __init__(self
, log
, loop
, dts
, job_controller
):
102 self
._job
_controller
= job_controller
104 self
._subscriber
= None
108 """ Register as a publisher and wait for reg_ready to complete """
110 return "D,/rw-image-mgmt:upload-jobs"
113 def on_prepare(xact_info
, action
, ks_path
, msg
):
114 if action
!= rwdts
.QueryAction
.READ
:
115 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
118 jobs_pb_msg
= self
._job
_controller
.pb_msg
120 xact_info
.respond_xpath(
121 rwdts
.XactRspCode
.ACK
,
126 reg_event
= asyncio
.Event(loop
=self
._loop
)
129 def on_ready(regh
, status
):
132 self
._subscriber
= yield from self
._dts
.register(
134 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
135 on_prepare
=on_prepare
,
138 flags
=rwdts
.Flag
.PUBLISHER
,
141 yield from reg_event
.wait()
144 class ImageDTSRPCHandler(object):
145 """ A DTS publisher for the upload-job RPC's """
146 def __init__(self
, log
, loop
, dts
, accounts
, glance_client
, upload_task_creator
, job_controller
):
150 self
._accounts
= accounts
151 self
._glance
_client
= glance_client
152 self
._upload
_task
_creator
= upload_task_creator
153 self
._job
_controller
= job_controller
155 self
._subscriber
= None
158 def _register_create_upload_job(self
):
160 return "/rw-image-mgmt:create-upload-job"
163 def on_prepare(xact_info
, action
, ks_path
, msg
):
166 account_names
= create_msg
.cloud_account
167 # If cloud accounts were not specified, upload image to all cloud account
168 if not account_names
:
169 account_names
= list(self
._accounts
.keys())
171 for account_name
in account_names
:
172 if account_name
not in self
._accounts
:
173 raise AccountNotFoundError("Could not find account %s", account_name
)
175 if create_msg
.has_field("external_url"):
176 glance_image
= yield from self
._upload
_task
_creator
.create_glance_image_from_url_create_rpc(
177 account_names
, create_msg
.external_url
180 tasks
= yield from self
._upload
_task
_creator
.create_tasks_from_glance_id(
181 account_names
, glance_image
.id
184 def delete_image(ft
):
186 self
._glance
_client
.delete_image_from_id(glance_image
.id)
187 except glance_client
.OpenstackImageDeleteError
:
190 # Create a job and when the job completes delete the temporary
191 # image from the catalog.
192 job_id
= self
._job
_controller
.create_job(
194 on_completed
=delete_image
197 elif create_msg
.has_field("onboarded_image"):
198 tasks
= yield from self
._upload
_task
_creator
.create_tasks_from_onboarded_create_rpc(
199 account_names
, create_msg
.onboarded_image
201 job_id
= self
._job
_controller
.create_job(tasks
)
204 raise ImageRequestError("an image selection must be provided")
206 rpc_out_msg
= RwImageMgmtYang
.CreateUploadJobOutput(job_id
=job_id
)
208 xact_info
.respond_xpath(
209 rwdts
.XactRspCode
.ACK
,
210 xpath
="O," + get_xpath(),
214 reg_event
= asyncio
.Event(loop
=self
._loop
)
217 def on_ready(_
, status
):
220 self
._subscriber
= yield from self
._dts
.register(
221 xpath
="I," + get_xpath(),
222 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
223 on_prepare
=on_prepare
,
226 flags
=rwdts
.Flag
.PUBLISHER
,
229 yield from reg_event
.wait()
232 def _register_cancel_upload_job(self
):
234 return "/rw-image-mgmt:cancel-upload-job"
237 def on_prepare(xact_info
, action
, ks_path
, msg
):
238 if not msg
.has_field("job_id"):
239 self
._log
.error("cancel-upload-job missing job-id field.")
240 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
245 job
= self
._job
_controller
.get_job(job_id
)
248 xact_info
.respond_xpath(
249 rwdts
.XactRspCode
.ACK
,
250 xpath
="O," + get_xpath(),
253 reg_event
= asyncio
.Event(loop
=self
._loop
)
256 def on_ready(_
, status
):
259 self
._subscriber
= yield from self
._dts
.register(
260 xpath
="I," + get_xpath(),
261 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
262 on_prepare
=on_prepare
,
265 flags
=rwdts
.Flag
.PUBLISHER
,
268 yield from reg_event
.wait()
272 """ Register for RPC's and wait for all registrations to complete """
273 yield from self
._register
_create
_upload
_job
()
274 yield from self
._register
_cancel
_upload
_job
()
277 class GlanceClientUploadTaskCreator(object):
278 """ This class creates upload tasks using configured cloud accounts and
279 configured image catalog glance client """
281 def __init__(self
, log
, loop
, accounts
, glance_client
):
284 self
._accounts
= accounts
285 self
._glance
_client
= glance_client
288 def create_tasks(self
, account_names
, image_id
=None, image_name
=None, image_checksum
=None):
289 """ Create a list of UploadTasks for a list of cloud accounts
290 and a image with a matching image_name and image_checksum in the
294 account_names - A list of configured cloud account names
295 image_id - A image id
296 image_name - A image name
297 image_checksum - A image checksum
300 A list of AccountImageUploadTask instances
303 ImageNotFoundError - Could not find a matching image in the
306 AccountNotFoundError - Could not find an account that matched
307 the provided account name
310 image
= yield from asyncio
.wait_for(
311 self
._loop
.run_in_executor(
313 self
._glance
_client
.find_active_image
,
322 except glance_client
.OpenstackImageError
as e
:
323 msg
= "Could not find image in Openstack to upload"
324 self
._log
.exception(msg
)
325 raise ImageNotFoundError(msg
) from e
327 image_info
= openstack_image_to_image_info(image
)
328 self
._log
.debug("created image info: %s", image_info
)
331 for account_name
in account_names
:
332 if account_name
not in self
._accounts
:
333 raise AccountNotFoundError("Could not find account %s", account_name
)
335 # For each account name provided, create a pipe (GlanceImagePipeGen)
336 # which feeds data into the UploadTask while also monitoring the various
337 # transmit stats (progress, bytes written, bytes per second, etc)
338 for account_name
in account_names
:
339 account
= self
._accounts
[account_name
]
340 self
._log
.debug("creating task for account %s", account
.name
)
341 glance_data_gen
= self
._glance
_client
.get_image_data(image_info
.id)
343 pipe_gen
= upload
.GlanceImagePipeGen(self
._log
, self
._loop
, glance_data_gen
)
344 progress_pipe
= upload
.UploadProgressWriteProxy(
345 self
._log
, self
._loop
, image
.size
, pipe_gen
.write_hdl
347 progress_pipe
.start_rate_monitoring()
348 pipe_gen
.write_hdl
= progress_pipe
351 task
= upload
.AccountImageUploadTask(
352 self
._log
, self
._loop
, account
, image_info
, pipe_gen
.read_hdl
,
353 progress_info
=progress_pipe
, write_canceller
=pipe_gen
,
356 self
._log
.debug("task created: %s", task
)
361 def create_glance_image_from_url_create_rpc(self
, account_names
, create_msg
):
362 if "image_url" not in create_msg
:
363 raise ValueError("image_url must be specified")
365 if "image_id" in create_msg
:
366 raise ImageRequestError("Cannot specify both image_url and image_id")
368 if "image_name" not in create_msg
:
369 raise ImageRequestError("image_name must be specified when image_url is provided")
371 glance_image
= yield from asyncio
.wait_for(
372 self
._loop
.run_in_executor(
374 self
._glance
_client
.create_image_from_url
,
375 create_msg
.image_url
,
376 create_msg
.image_name
,
377 create_msg
.image_checksum
if "image_checksum" in create_msg
else None,
378 create_msg
.disk_format
if "disk_format" in create_msg
else None,
379 create_msg
.container_format
if "container_format" in create_msg
else None,
388 def create_tasks_from_glance_id(self
, account_names
, glance_image_id
):
389 return (yield from self
.create_tasks(account_names
, glance_image_id
))
392 def create_tasks_from_onboarded_create_rpc(self
, account_names
, create_msg
):
393 return (yield from self
.create_tasks(
395 create_msg
.image_id
if "image_id" in create_msg
else None,
396 create_msg
.image_name
if "image_name" in create_msg
else None,
397 create_msg
.image_checksum
if "image_checksum" in create_msg
else None)
401 class ImageManagerTasklet(rift
.tasklets
.Tasklet
):
403 The RwImageMgrTasklet provides a interface for DTS to interact with an
404 instance of the Monitor class. This allows the Monitor class to remain
408 def __init__(self
, *args
, **kwargs
):
409 super().__init
__(*args
, **kwargs
)
410 self
.rwlog
.set_category("rw-mano-log")
412 self
.cloud_cfg_subscriber
= None
413 self
.http_proxy
= None
414 self
.proxy_server
= None
416 self
.job_controller
= None
417 self
.cloud_accounts
= {}
418 self
.glance_client
= None
419 self
.task_creator
= None
420 self
.rpc_handler
= None
421 self
.show_handler
= None
425 self
.log
.info("Starting Image Manager Tasklet")
427 self
.log
.debug("Registering with dts")
428 self
.dts
= rift
.tasklets
.DTS(
430 RwImageMgmtYang
.get_schema(),
432 self
.on_dts_state_change
435 self
.log
.debug("Created DTS Api GI Object: %s", self
.dts
)
440 except Exception as e
:
441 self
.log
.exception(e
)
446 self
.log
.debug("creating cloud account handler")
447 self
.cloud_cfg_subscriber
= CloudAccountDtsHandler(self
.log
, self
.dts
, self
.log_hdl
)
448 self
.cloud_cfg_subscriber
.register(
449 self
.on_cloud_account_create
,
450 self
.on_cloud_account_delete
453 self
.log
.debug("creating http proxy server")
455 self
.http_proxy
= glance_proxy_server
.QuickProxyServer(self
.log
, self
.loop
)
457 self
.proxy_server
= glance_proxy_server
.GlanceHTTPProxyServer(
458 self
.log
, self
.loop
, self
.http_proxy
460 self
.proxy_server
.start()
462 self
.job_controller
= upload
.ImageUploadJobController(
466 self
.glance_client
= glance_client
.OpenstackGlanceClient
.from_token(
467 self
.log
, "127.0.0.1", "9292", "test"
470 self
.task_creator
= GlanceClientUploadTaskCreator(
471 self
.log
, self
.loop
, self
.cloud_accounts
, self
.glance_client
474 self
.rpc_handler
= ImageDTSRPCHandler(
475 self
.log
, self
.loop
, self
.dts
, self
.cloud_accounts
, self
.glance_client
, self
.task_creator
,
478 yield from self
.rpc_handler
.register()
480 self
.show_handler
= ImageDTSShowHandler(
481 self
.log
, self
.loop
, self
.dts
, self
.job_controller
483 yield from self
.show_handler
.register()
485 except Exception as e
:
486 self
.log
.exception("error during init")
488 def on_cloud_account_create(self
, account
):
489 self
.log
.debug("adding cloud account: %s", account
.name
)
490 self
.cloud_accounts
[account
.name
] = account
492 def on_cloud_account_delete(self
, account_name
):
493 self
.log
.debug("deleting cloud account: %s", account_name
)
494 if account_name
not in self
.cloud_accounts
:
495 self
.log
.warning("cloud account not found: %s", account_name
)
497 del self
.cloud_accounts
[account_name
]
503 def on_instance_started(self
):
504 self
.log
.debug("Got instance started callback")
507 def on_dts_state_change(self
, state
):
508 """Handle DTS state change
510 Take action according to current DTS state to transition application
511 into the corresponding application state
514 state - current dts state
518 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
519 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
523 rwdts
.State
.INIT
: self
.init
,
524 rwdts
.State
.RUN
: self
.run
,
527 # Transition application to next state
528 handler
= handlers
.get(state
, None)
529 if handler
is not None:
532 # Transition dts to next state
533 next_state
= switch
.get(state
, None)
534 if next_state
is not None:
535 self
.dts
.handle
.set_state(next_state
)