update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / tasklets / rwimagemgr / upload.py
index c1716d3..ed79f3d 100644 (file)
@@ -51,9 +51,10 @@ class ImageUploadJobController(object):
     """ This class starts and manages ImageUploadJobs """
     MAX_COMPLETED_JOBS = 20
 
-    def __init__(self, log, loop, max_completed_jobs=MAX_COMPLETED_JOBS):
-        self._log = log
-        self._loop = loop
+    def __init__(self, project, max_completed_jobs=MAX_COMPLETED_JOBS):
+        self._log = project.log
+        self._loop = project.loop
+        self._project = project
         self._job_id_gen = itertools.count(1)
         self._max_completed_jobs = max_completed_jobs
 
@@ -65,7 +66,7 @@ class ImageUploadJobController(object):
     @property
     def pb_msg(self):
         """ the UploadJobs protobuf message """
-        upload_jobs_msg = RwImageMgmtYang.UploadJobs()
+        upload_jobs_msg = RwImageMgmtYang.YangData_RwProject_Project_UploadJobs()
         for job in self._jobs.values():
             upload_jobs_msg.job.append(job.pb_msg)
 
@@ -210,7 +211,7 @@ class ImageUploadJob(object):
     @property
     def pb_msg(self):
         """ The UploadJob protobuf message """
-        task = RwImageMgmtYang.UploadJob.from_dict({
+        task = RwImageMgmtYang.YangData_RwProject_Project_UploadJobs_Job.from_dict({
             "id": self._job_id,
             "status": self._state,
             "start_time": self._start_time,
@@ -367,14 +368,17 @@ class UploadProgressWriteProxy(object):
         """ Start the rate monitoring task """
         @asyncio.coroutine
         def periodic_rate_task():
-            while True:
-                start_time = time.time()
-                start_bytes = self._bytes_written
-                yield from asyncio.sleep(1, loop=self._loop)
-                time_period = time.time() - start_time
-                num_bytes = self._bytes_written - start_bytes
+            try:
+                while True:
+                    start_time = time.time()
+                    start_bytes = self._bytes_written
+                    yield from asyncio.sleep(1, loop=self._loop)
+                    time_period = time.time() - start_time
+                    num_bytes = self._bytes_written - start_bytes
 
-                self._byte_rate = self._rate_calc.add_measurement(num_bytes, time_period)
+                    self._byte_rate = self._rate_calc.add_measurement(num_bytes, time_period)
+            except asyncio.CancelledError:
+                self._log.debug("rate monitoring task cancelled")
 
         self._log.debug("starting rate monitoring task")
         self._rate_task = self._loop.create_task(periodic_rate_task())
@@ -421,6 +425,9 @@ class GlanceImagePipeGen(object):
         self._write_hdl = os.fdopen(write_fd, 'wb')
         self._close_hdl = self._write_hdl
 
+        self._stop = False
+        self._t = None
+
     @property
     def write_hdl(self):
         return self._write_hdl
@@ -437,6 +444,9 @@ class GlanceImagePipeGen(object):
         self._log.debug("starting image data write to pipe")
         try:
             for data in self._data_gen:
+                if self._stop:
+                    break
+
                 try:
                     self._write_hdl.write(data)
                 except (BrokenPipeError, ValueError) as e:
@@ -458,9 +468,13 @@ class GlanceImagePipeGen(object):
         t.daemon = True
         t.start()
 
+        self._t = t
+
     def stop(self):
         self._log.debug("stop requested, closing write side of pipe")
-        self._write_hdl.close()
+        self._stop = True
+        if self._t is not None:
+            self._t.join(timeout=1)
 
 
 class AccountImageUploadTask(object):
@@ -543,7 +557,7 @@ class AccountImageUploadTask(object):
     @property
     def pb_msg(self):
         """ The UploadTask protobuf message """
-        task = RwImageMgmtYang.UploadTask.from_dict({
+        task = RwImageMgmtYang.YangData_RwProject_Project_UploadJobs_Job_UploadTasks.from_dict({
             "cloud_account": self.cloud_account,
             "image_id": self.image_id,
             "image_name": self.image_name,