+class LockRenew:
+
+ renew_list = []
+ # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
+ # be renewed. The time order is achieved as it is appended at the end
+
+ def __init__(self, config, logger):
+ """
+ Constructor of class
+ :param config: configuration parameters of database and messaging
+ """
+ self.config = config
+ self.logger = logger
+ self.to_terminate = False
+ self.loop = None
+ self.db = None
+ self.task_locked_time = config["global"]["task_locked_time"]
+ self.task_relock_time = config["global"]["task_relock_time"]
+ self.task_max_locked_time = config["global"]["task_max_locked_time"]
+
+ def start(self, db, loop):
+ self.db = db
+ self.loop = loop
+
+ @staticmethod
+ def add_lock_object(database_table, database_object, thread_object):
+ """
+ Insert a task to renew the locking
+ :param database_table: database collection where to maintain the lock
+ :param database_object: database object. '_id' and 'locked_at' are mandatory keys
+ :param thread_object: Thread object that has locked to check if it is alive
+ :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
+ """
+ lock_object = {
+ "table": database_table,
+ "_id": database_object["_id"],
+ "initial_lock_time": database_object["locked_at"],
+ "locked_at": database_object["locked_at"],
+ "thread": thread_object,
+ "unlocked": False # True when it is not needed any more
+ }
+ LockRenew.renew_list.append(lock_object)
+ return lock_object
+
+ @staticmethod
+ def remove_lock_object(lock_object):
+ lock_object["unlocked"] = True
+
+ async def renew_locks(self):
+ while not self.to_terminate:
+ if not self.renew_list:
+ await asyncio.sleep(self.task_locked_time - self.task_relock_time, loop=self.loop)
+ continue
+ lock_object = self.renew_list[0]
+ if lock_object["unlocked"] or not lock_object["thread"] or not lock_object["thread"].is_alive():
+ # task has been finished or locker thread is dead, not needed to re-locked.
+ self.renew_list.pop(0)
+ continue
+
+ locked_at = lock_object["locked_at"]
+ now = time()
+ time_to_relock = locked_at + self.task_locked_time - self.task_relock_time - now
+ if time_to_relock < 1:
+ if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
+ self.renew_list.pop(0)
+ # re-lock
+ new_locked_at = locked_at + self.task_locked_time
+ try:
+ if self.db.set_one(lock_object["table"],
+ update_dict={"locked_at": new_locked_at, "modified_at": now},
+ q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
+ fail_on_empty=False):
+ self.logger.debug("Renew lock for {}.{}".format(lock_object["table"], lock_object["_id"]))
+ lock_object["locked_at"] = new_locked_at
+ self.renew_list.append(lock_object)
+ else:
+ self.logger.info("Cannot renew lock for {}.{}".format(lock_object["table"],
+ lock_object["_id"]))
+ except Exception as e:
+ self.logger.error("Exception when trying to renew lock for {}.{}: {}".format(
+ lock_object["table"], lock_object["_id"], e))
+ else:
+ # wait until it is time to re-lock it
+ await asyncio.sleep(time_to_relock, loop=self.loop)
+
+ def stop(self):
+ # unlock all locked items
+ now = time()
+ for lock_object in self.renew_list:
+ locked_at = lock_object["locked_at"]
+ if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
+ self.db.set_one(lock_object["table"], update_dict={"locked_at": 0},
+ q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
+ fail_on_empty=False)
+
+