""" This class starts and manages ImageUploadJobs """
MAX_COMPLETED_JOBS = 20
- def __init__(self, log, loop, max_completed_jobs=MAX_COMPLETED_JOBS):
- self._log = log
- self._loop = loop
+ def __init__(self, project, max_completed_jobs=MAX_COMPLETED_JOBS):
+ self._log = project.log
+ self._loop = project.loop
+ self._project = project
self._job_id_gen = itertools.count(1)
self._max_completed_jobs = max_completed_jobs
@property
def pb_msg(self):
""" the UploadJobs protobuf message """
- upload_jobs_msg = RwImageMgmtYang.UploadJobs()
+ upload_jobs_msg = RwImageMgmtYang.YangData_RwProject_Project_UploadJobs()
for job in self._jobs.values():
upload_jobs_msg.job.append(job.pb_msg)
@property
def pb_msg(self):
""" The UploadJob protobuf message """
- task = RwImageMgmtYang.UploadJob.from_dict({
+ task = RwImageMgmtYang.YangData_RwProject_Project_UploadJobs_Job.from_dict({
"id": self._job_id,
"status": self._state,
"start_time": self._start_time,
if failed_tasks:
self._log.error("%s had %s FAILED tasks.", self, len(failed_tasks))
+ for ftask in failed_tasks:
+ self._log.error("%s : Failed to upload image : %s to cloud_account : %s", self, ftask.image_name, ftask.cloud_account)
self.state = "FAILED"
else:
self._log.debug("%s tasks completed successfully", len(self._upload_tasks))
""" Start the rate monitoring task """
@asyncio.coroutine
def periodic_rate_task():
- while True:
- start_time = time.time()
- start_bytes = self._bytes_written
- yield from asyncio.sleep(1, loop=self._loop)
- time_period = time.time() - start_time
- num_bytes = self._bytes_written - start_bytes
+ try:
+ while True:
+ start_time = time.time()
+ start_bytes = self._bytes_written
+ yield from asyncio.sleep(1, loop=self._loop)
+ time_period = time.time() - start_time
+ num_bytes = self._bytes_written - start_bytes
- self._byte_rate = self._rate_calc.add_measurement(num_bytes, time_period)
+ self._byte_rate = self._rate_calc.add_measurement(num_bytes, time_period)
+ except asyncio.CancelledError:
+ self._log.debug("rate monitoring task cancelled")
self._log.debug("starting rate monitoring task")
self._rate_task = self._loop.create_task(periodic_rate_task())
self._write_hdl = os.fdopen(write_fd, 'wb')
self._close_hdl = self._write_hdl
+ self._stop = False
+ self._t = None
+
@property
def write_hdl(self):
return self._write_hdl
self._log.debug("starting image data write to pipe")
try:
for data in self._data_gen:
+ if self._stop:
+ break
+
try:
self._write_hdl.write(data)
except (BrokenPipeError, ValueError) as e:
t.daemon = True
t.start()
+ self._t = t
+
def stop(self):
self._log.debug("stop requested, closing write side of pipe")
- self._write_hdl.close()
+ self._stop = True
+ if self._t is not None:
+ self._t.join(timeout=1)
class AccountImageUploadTask(object):
@property
def pb_msg(self):
""" The UploadTask protobuf message """
- task = RwImageMgmtYang.UploadTask.from_dict({
+ task = RwImageMgmtYang.YangData_RwProject_Project_UploadJobs_Job_UploadTasks.from_dict({
"cloud_account": self.cloud_account,
"image_id": self.image_id,
"image_name": self.image_name,