X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwimagemgr%2Frift%2Ftasklets%2Frwimagemgr%2Fupload.py;h=ed79f3d2c8606b2d5d9eb57109e4ca456256be90;hb=f314b4af9744068a7ed7a6a6314220c3aa857523;hp=c1716d3cd321b053924017d3faed985e85b91343;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/upload.py b/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/upload.py index c1716d3c..ed79f3d2 100644 --- a/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/upload.py +++ b/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/upload.py @@ -51,9 +51,10 @@ class ImageUploadJobController(object): """ This class starts and manages ImageUploadJobs """ MAX_COMPLETED_JOBS = 20 - def __init__(self, log, loop, max_completed_jobs=MAX_COMPLETED_JOBS): - self._log = log - self._loop = loop + def __init__(self, project, max_completed_jobs=MAX_COMPLETED_JOBS): + self._log = project.log + self._loop = project.loop + self._project = project self._job_id_gen = itertools.count(1) self._max_completed_jobs = max_completed_jobs @@ -65,7 +66,7 @@ class ImageUploadJobController(object): @property def pb_msg(self): """ the UploadJobs protobuf message """ - upload_jobs_msg = RwImageMgmtYang.UploadJobs() + upload_jobs_msg = RwImageMgmtYang.YangData_RwProject_Project_UploadJobs() for job in self._jobs.values(): upload_jobs_msg.job.append(job.pb_msg) @@ -210,7 +211,7 @@ class ImageUploadJob(object): @property def pb_msg(self): """ The UploadJob protobuf message """ - task = RwImageMgmtYang.UploadJob.from_dict({ + task = RwImageMgmtYang.YangData_RwProject_Project_UploadJobs_Job.from_dict({ "id": self._job_id, "status": self._state, "start_time": self._start_time, @@ -367,14 +368,17 @@ class UploadProgressWriteProxy(object): """ Start the rate monitoring task """ @asyncio.coroutine def periodic_rate_task(): - while True: - start_time = time.time() - start_bytes = self._bytes_written - yield from asyncio.sleep(1, loop=self._loop) - time_period = time.time() - start_time - num_bytes = self._bytes_written - start_bytes + try: + while True: + start_time = time.time() + start_bytes = self._bytes_written + yield from asyncio.sleep(1, loop=self._loop) + time_period = time.time() - start_time + num_bytes = self._bytes_written - start_bytes - self._byte_rate = self._rate_calc.add_measurement(num_bytes, time_period) + self._byte_rate = self._rate_calc.add_measurement(num_bytes, time_period) + except asyncio.CancelledError: + self._log.debug("rate monitoring task cancelled") self._log.debug("starting rate monitoring task") self._rate_task = self._loop.create_task(periodic_rate_task()) @@ -421,6 +425,9 @@ class GlanceImagePipeGen(object): self._write_hdl = os.fdopen(write_fd, 'wb') self._close_hdl = self._write_hdl + self._stop = False + self._t = None + @property def write_hdl(self): return self._write_hdl @@ -437,6 +444,9 @@ class GlanceImagePipeGen(object): self._log.debug("starting image data write to pipe") try: for data in self._data_gen: + if self._stop: + break + try: self._write_hdl.write(data) except (BrokenPipeError, ValueError) as e: @@ -458,9 +468,13 @@ class GlanceImagePipeGen(object): t.daemon = True t.start() + self._t = t + def stop(self): self._log.debug("stop requested, closing write side of pipe") - self._write_hdl.close() + self._stop = True + if self._t is not None: + self._t.join(timeout=1) class AccountImageUploadTask(object): @@ -543,7 +557,7 @@ class AccountImageUploadTask(object): @property def pb_msg(self): """ The UploadTask protobuf message """ - task = RwImageMgmtYang.UploadTask.from_dict({ + task = RwImageMgmtYang.YangData_RwProject_Project_UploadJobs_Job_UploadTasks.from_dict({ "cloud_account": self.cloud_account, "image_id": self.image_id, "image_name": self.image_name,