import rift.tasklets
import rift.mano.cloud
+from rift.mano.utils.project import (
+ ManoProject,
+ ProjectConfigCallbacks,
+ ProjectHandler,
+ get_add_delete_update_cfgs,
+ DEFAULT_PROJECT,
+ )
from . import glance_proxy_server
from . import glance_client
class CloudAccountDtsHandler(object):
- def __init__(self, log, dts, log_hdl):
+ def __init__(self, log, dts, log_hdl, project):
self._dts = dts
self._log = log
self._log_hdl = log_hdl
self._cloud_cfg_subscriber = None
+ self._project = project
+ @asyncio.coroutine
def register(self, on_add_apply, on_delete_apply):
- self._log.debug("creating cloud account config handler")
+ self._log.debug("Project {}: creating cloud account config handler".
+ format(self._project.name))
self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber(
- self._dts, self._log, self._log_hdl,
+ self._dts, self._log, self._log_hdl, self._project,
rift.mano.cloud.CloudAccountConfigCallbacks(
on_add_apply=on_add_apply,
on_delete_apply=on_delete_apply,
)
)
- self._cloud_cfg_subscriber.register()
+ yield from self._cloud_cfg_subscriber.register()
+
+ def deregister(self):
+ self._log.debug("Project {}: Removing cloud account config handler".
+ format(self._project.name))
+ self._cloud_cfg_subscriber.deregister()
def openstack_image_to_image_info(openstack_image):
A ImageInfo CAL protobuf message
"""
- image_info = RwcalYang.ImageInfoItem()
+ image_info = RwcalYang.YangData_RwProject_Project_VimResources_ImageinfoList()
copy_fields = ["id", "name", "checksum", "container_format", "disk_format"]
for field in copy_fields:
value = getattr(openstack_image, field)
setattr(image_info, field, value)
+ value = getattr(openstack_image, "properties")
+ for key in value:
+ prop = image_info.properties.add()
+ prop.name = key
+ prop.property_value = value[key]
+
image_info.state = openstack_image.status
return image_info
class ImageDTSShowHandler(object):
""" A DTS publisher for the upload-jobs data container """
- def __init__(self, log, loop, dts, job_controller):
- self._log = log
- self._loop = loop
- self._dts = dts
+ def __init__(self, project, job_controller):
+ self._log = project.log
+ self._loop = project.loop
+ self._dts = project.dts
self._job_controller = job_controller
+ self._project = project
self._subscriber = None
+ def get_xpath(self):
+ return self._project.add_project("D,/rw-image-mgmt:upload-jobs")
+
@asyncio.coroutine
def register(self):
""" Register as a publisher and wait for reg_ready to complete """
- def get_xpath():
- return "D,/rw-image-mgmt:upload-jobs"
@asyncio.coroutine
def on_prepare(xact_info, action, ks_path, msg):
xact_info.respond_xpath(
rwdts.XactRspCode.ACK,
- xpath=get_xpath(),
+ xpath=self.get_xpath(),
msg=jobs_pb_msg,
)
reg_event.set()
self._subscriber = yield from self._dts.register(
- xpath=get_xpath(),
+ xpath=self.get_xpath(),
handler=rift.tasklets.DTS.RegistrationHandler(
on_prepare=on_prepare,
on_ready=on_ready,
yield from reg_event.wait()
+ def deregister(self):
+ self._log.debug("Project {}: De-register show image handler".
+ format(self._project.name))
+ if self._subscriber:
+ self._subscriber.delete_element(self.get_xpath())
+ self._subscriber.deregister()
+ self._subscriber = None
+
class ImageDTSRPCHandler(object):
""" A DTS publisher for the upload-job RPC's """
- def __init__(self, log, loop, dts, accounts, glance_client, upload_task_creator, job_controller):
- self._log = log
- self._loop = loop
- self._dts = dts
- self._accounts = accounts
+ def __init__(self, project, glance_client, upload_task_creator, job_controller):
+ self._log = project.log
+ self._loop = project.loop
+ self._dts = project.dts
self._glance_client = glance_client
self._upload_task_creator = upload_task_creator
self._job_controller = job_controller
+ self._project = project
- self._subscriber = None
+ self._create = None
+ self._cancel = None
+
+ @property
+ def accounts(self):
+ return self._project.cloud_accounts
@asyncio.coroutine
def _register_create_upload_job(self):
create_msg = msg
account_names = create_msg.cloud_account
+
+ self._log.debug("Create upload job msg: {} ".format(msg.as_dict()))
+
+ if not self._project.rpc_check(msg, xact_info):
+ return
+
# If cloud accounts were not specified, upload image to all cloud account
if not account_names:
- account_names = list(self._accounts.keys())
+ account_names = list(self.accounts.keys())
- for account_name in account_names:
- if account_name not in self._accounts:
- raise AccountNotFoundError("Could not find account %s", account_name)
+ else:
+ for account_name in account_names:
+ if account_name not in self.accounts:
+ raise AccountNotFoundError("Could not find account %s", account_name)
if create_msg.has_field("external_url"):
glance_image = yield from self._upload_task_creator.create_glance_image_from_url_create_rpc(
)
elif create_msg.has_field("onboarded_image"):
+ self._log.debug("onboarded_image {} to accounts {}".
+ format(create_msg.onboarded_image, account_names))
tasks = yield from self._upload_task_creator.create_tasks_from_onboarded_create_rpc(
account_names, create_msg.onboarded_image
)
else:
raise ImageRequestError("an image selection must be provided")
- rpc_out_msg = RwImageMgmtYang.CreateUploadJobOutput(job_id=job_id)
+ rpc_out_msg = RwImageMgmtYang.YangOutput_RwImageMgmt_CreateUploadJob(job_id=job_id)
xact_info.respond_xpath(
rwdts.XactRspCode.ACK,
def on_ready(_, status):
reg_event.set()
- self._subscriber = yield from self._dts.register(
- xpath="I," + get_xpath(),
- handler=rift.tasklets.DTS.RegistrationHandler(
- on_prepare=on_prepare,
- on_ready=on_ready,
- ),
- flags=rwdts.Flag.PUBLISHER,
- )
+ self._create = yield from self._dts.register(
+ xpath="I," + get_xpath(),
+ handler=rift.tasklets.DTS.RegistrationHandler(
+ on_prepare=on_prepare,
+ on_ready=on_ready,
+ ),
+ flags=rwdts.Flag.PUBLISHER,
+ )
yield from reg_event.wait()
@asyncio.coroutine
def on_prepare(xact_info, action, ks_path, msg):
+ if not self._project.rpc_check(msg, xact_info):
+ return
+
if not msg.has_field("job_id"):
self._log.error("cancel-upload-job missing job-id field.")
xact_info.respond_xpath(rwdts.XactRspCode.NACK)
def on_ready(_, status):
reg_event.set()
- self._subscriber = yield from self._dts.register(
- xpath="I," + get_xpath(),
- handler=rift.tasklets.DTS.RegistrationHandler(
- on_prepare=on_prepare,
- on_ready=on_ready,
- ),
- flags=rwdts.Flag.PUBLISHER,
- )
+ self._cancel = yield from self._dts.register(
+ xpath="I," + get_xpath(),
+ handler=rift.tasklets.DTS.RegistrationHandler(
+ on_prepare=on_prepare,
+ on_ready=on_ready,
+ ),
+ flags=rwdts.Flag.PUBLISHER,
+ )
yield from reg_event.wait()
yield from self._register_create_upload_job()
yield from self._register_cancel_upload_job()
+ def deregister(self):
+ self._log.debug("Project {}: Deregister image rpc handlers".
+ format(self._project.name))
+ if self._create:
+ self._create.deregister()
+ self._create = None
+
+ if self._cancel:
+ self._cancel.deregister()
+ self._cancel = None
+
class GlanceClientUploadTaskCreator(object):
""" This class creates upload tasks using configured cloud accounts and
configured image catalog glance client """
- def __init__(self, log, loop, accounts, glance_client):
- self._log = log
- self._loop = loop
- self._accounts = accounts
+ def __init__(self, project, glance_client):
+ self._log = project.log
+ self._loop = project.loop
self._glance_client = glance_client
+ self._project = project
+
+ @property
+ def accounts(self):
+ return self._project.cloud_accounts
@asyncio.coroutine
def create_tasks(self, account_names, image_id=None, image_name=None, image_checksum=None):
tasks = []
for account_name in account_names:
- if account_name not in self._accounts:
+ if account_name not in self.accounts:
raise AccountNotFoundError("Could not find account %s", account_name)
# For each account name provided, create a pipe (GlanceImagePipeGen)
# which feeds data into the UploadTask while also monitoring the various
# transmit stats (progress, bytes written, bytes per second, etc)
for account_name in account_names:
- account = self._accounts[account_name]
+ account = self.accounts[account_name]
self._log.debug("creating task for account %s", account.name)
glance_data_gen = self._glance_client.get_image_data(image_info.id)
create_msg.image_checksum if "image_checksum" in create_msg else None)
)
+class ImageMgrProject(ManoProject):
+
+ def __init__(self, name, tasklet, **kw):
+ super(ImageMgrProject, self).__init__(tasklet.log, name)
+ self.update(tasklet)
+ try:
+ self.glance_client = kw['client']
+ except KeyError as e:
+ self._log.exception("kw {}: {}".format(kw, e))
+
+ self.cloud_cfg_subscriber = None
+ self.job_controller = None
+ self.task_creator = None
+ self.rpc_handler = None
+ self.show_handler = None
+
+ self.cloud_accounts = {}
+
+ @asyncio.coroutine
+ def register(self):
+ try:
+ self.log.debug("creating cloud account handler")
+ self.cloud_cfg_subscriber = CloudAccountDtsHandler(self._log,
+ self._dts,
+ self._log_hdl,
+ self)
+ yield from self.cloud_cfg_subscriber.register(
+ self.on_cloud_account_create,
+ self.on_cloud_account_delete
+ )
+
+ self.job_controller = upload.ImageUploadJobController(
+ self
+ )
+
+ self.task_creator = GlanceClientUploadTaskCreator(
+ self, self.glance_client,
+ )
+
+ self.rpc_handler = ImageDTSRPCHandler(
+ self, self.glance_client, self.task_creator,
+ self.job_controller,
+ )
+ yield from self.rpc_handler.register()
+
+ self.show_handler = ImageDTSShowHandler(
+ self, self.job_controller,
+ )
+ yield from self.show_handler.register()
+ except Exception as e:
+ self.log.exception("Error during project {} register: e".
+ format(self.name, e))
+
+ def deregister(self):
+ self.log.debug("De-register handlers for project: {}".format(self.name))
+ self.rpc_handler.deregister()
+ self.show_handler.deregister()
+ self.cloud_cfg_subscriber.deregister()
+
+ def on_cloud_account_create(self, account):
+ self.log.debug("adding cloud account: %s", account.name)
+ self.cloud_accounts[account.name] = account
+
+ def on_cloud_account_delete(self, account_name):
+ self.log.debug("deleting cloud account: %s", account_name)
+ if account_name not in self.cloud_accounts:
+ self.log.warning("cloud account not found: %s", account_name)
+ else:
+ del self.cloud_accounts[account_name]
class ImageManagerTasklet(rift.tasklets.Tasklet):
"""
super().__init__(*args, **kwargs)
self.rwlog.set_category("rw-mano-log")
- self.cloud_cfg_subscriber = None
self.http_proxy = None
self.proxy_server = None
self.dts = None
- self.job_controller = None
- self.cloud_accounts = {}
self.glance_client = None
- self.task_creator = None
- self.rpc_handler = None
- self.show_handler = None
+ self.project_handler = None
+
+ self.projects = {}
def start(self):
super().start()
@asyncio.coroutine
def init(self):
try:
- self.log.debug("creating cloud account handler")
- self.cloud_cfg_subscriber = CloudAccountDtsHandler(self.log, self.dts, self.log_hdl)
- self.cloud_cfg_subscriber.register(
- self.on_cloud_account_create,
- self.on_cloud_account_delete
- )
-
self.log.debug("creating http proxy server")
self.http_proxy = glance_proxy_server.QuickProxyServer(self.log, self.loop)
)
self.proxy_server.start()
- self.job_controller = upload.ImageUploadJobController(
- self.log, self.loop
- )
-
self.glance_client = glance_client.OpenstackGlanceClient.from_token(
self.log, "127.0.0.1", "9292", "test"
)
- self.task_creator = GlanceClientUploadTaskCreator(
- self.log, self.loop, self.cloud_accounts, self.glance_client
- )
-
- self.rpc_handler = ImageDTSRPCHandler(
- self.log, self.loop, self.dts, self.cloud_accounts, self.glance_client, self.task_creator,
- self.job_controller
- )
- yield from self.rpc_handler.register()
-
- self.show_handler = ImageDTSShowHandler(
- self.log, self.loop, self.dts, self.job_controller
- )
- yield from self.show_handler.register()
+ self.log.debug("Creating project handler")
+ self.project_handler = ProjectHandler(self, ImageMgrProject,
+ client=self.glance_client)
+ self.project_handler.register()
except Exception as e:
self.log.exception("error during init")
- def on_cloud_account_create(self, account):
- self.log.debug("adding cloud account: %s", account.name)
- self.cloud_accounts[account.name] = account
-
- def on_cloud_account_delete(self, account_name):
- self.log.debug("deleting cloud account: %s", account_name)
- if account_name not in self.cloud_accounts:
- self.log.warning("cloud account not found: %s", account_name)
-
- del self.cloud_accounts[account_name]
-
@asyncio.coroutine
def run(self):
pass