From 1ac189e7d842e8aa09133b769097e8703ec1352c Mon Sep 17 00:00:00 2001 From: aticig Date: Thu, 30 Jun 2022 19:29:04 +0300 Subject: [PATCH] Fix Bug 2086 Updating VNF status configurable This fix allows to set REFRESH_ACTIVE period as config option which periodically checks the VM status from VIM. Env variable can be set in the ro container export OSMRO_PERIOD_REFRESH_ACTIVE=-1 to disable VM status updates. This config parameter allowed to set >= 60 seconds or -1. Making the stage-releasenotes.sh executable. Change-Id: I90a859d9b831c1ce24faea4eab12dc998ac655d5 Signed-off-by: aticig --- NG-RO/osm_ng_ro/ns_thread.py | 114 +++++++-- NG-RO/osm_ng_ro/ro.cfg | 7 + NG-RO/osm_ng_ro/ro_main.py | 2 +- NG-RO/osm_ng_ro/tests/test_ns_thread.py | 230 ++++++++++++++++++ devops-stages/stage-releasenote.sh | 0 .../notes/fix_bug_2086-344f52762a42cdb2.yaml | 25 ++ 6 files changed, 358 insertions(+), 20 deletions(-) mode change 100644 => 100755 devops-stages/stage-releasenote.sh create mode 100644 releasenotes/notes/fix_bug_2086-344f52762a42cdb2.yaml diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py index 8cd7d2cb..7656a544 100644 --- a/NG-RO/osm_ng_ro/ns_thread.py +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -33,6 +33,7 @@ from shutil import rmtree import threading import time import traceback +from typing import Dict from unittest.mock import Mock from importlib_metadata import entry_points @@ -1472,18 +1473,41 @@ class VimInteractionResize(VimInteractionBase): return "FAILED", ro_vim_item_update, db_task_update -class NsWorker(threading.Thread): - REFRESH_BUILD = 5 # 5 seconds - REFRESH_ACTIVE = 60 # 1 minute - REFRESH_ERROR = 600 - REFRESH_IMAGE = 3600 * 10 - REFRESH_DELETE = 3600 * 10 - QUEUE_SIZE = 100 - terminate = False +class ConfigValidate: + def __init__(self, config: Dict): + self.conf = config + + @property + def active(self): + # default 1 min, allowed >= 60 or -1, -1 disables periodic checks + if ( + self.conf["period"]["refresh_active"] >= 60 + or self.conf["period"]["refresh_active"] == -1 + ): + return self.conf["period"]["refresh_active"] + + return 60 + + @property + def build(self): + return self.conf["period"]["refresh_build"] + + @property + def image(self): + return self.conf["period"]["refresh_image"] + + @property + def error(self): + return self.conf["period"]["refresh_error"] + @property + def queue_size(self): + return self.conf["period"]["queue_size"] + + +class NsWorker(threading.Thread): def __init__(self, worker_index, config, plugins, db): """ - :param worker_index: thread index :param config: general configuration of RO, among others the process_id with the docker id where it runs :param plugins: global shared dict with the loaded plugins @@ -1495,7 +1519,9 @@ class NsWorker(threading.Thread): self.plugin_name = "unknown" self.logger = logging.getLogger("ro.worker{}".format(worker_index)) self.worker_index = worker_index - self.task_queue = queue.Queue(self.QUEUE_SIZE) + # refresh periods for created items + self.refresh_config = ConfigValidate(config) + self.task_queue = queue.Queue(self.refresh_config.queue_size) # targetvim: vimplugin class self.my_vims = {} # targetvim: vim information from database @@ -1875,6 +1901,7 @@ class NsWorker(threading.Thread): "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], "locked_at.lt": now - self.task_locked_time, "to_check_at.lt": self.time_last_task_processed, + "to_check_at.gt": -1, }, update_dict={"locked_by": self.my_id, "locked_at": now}, fail_on_empty=False, @@ -2177,6 +2204,52 @@ class NsWorker(threading.Thread): return ro_task_dependency, task_index raise NsWorkerException("Cannot get depending task {}".format(task_id)) + def update_vm_refresh(self): + """Enables the VM status updates if self.refresh_config.active parameter + is not -1 and than updates the DB accordingly + + """ + try: + self.logger.debug("Checking if VM status update config") + next_refresh = time.time() + if self.refresh_config.active == -1: + next_refresh = -1 + else: + next_refresh += self.refresh_config.active + + if next_refresh != -1: + db_ro_task_update = {} + now = time.time() + next_check_at = now + (24 * 60 * 60) + next_check_at = min(next_check_at, next_refresh) + db_ro_task_update["vim_info.refresh_at"] = next_refresh + db_ro_task_update["to_check_at"] = next_check_at + + self.logger.debug( + "Finding tasks which to be updated to enable VM status updates" + ) + refresh_tasks = self.db.get_list( + "ro_tasks", + q_filter={ + "tasks.status": "DONE", + "to_check_at.lt": 0, + }, + ) + self.logger.debug("Updating tasks to change the to_check_at status") + for task in refresh_tasks: + q_filter = { + "_id": task["_id"], + } + self.db.set_one( + "ro_tasks", + q_filter=q_filter, + update_dict=db_ro_task_update, + fail_on_empty=True, + ) + + except Exception as e: + self.logger.error(f"Error updating tasks to enable VM status updates: {e}") + def _process_pending_tasks(self, ro_task): ro_task_id = ro_task["_id"] now = time.time() @@ -2194,13 +2267,16 @@ class NsWorker(threading.Thread): next_refresh = time.time() if task["item"] in ("image", "flavor"): - next_refresh += self.REFRESH_IMAGE + next_refresh += self.refresh_config.image elif new_status == "BUILD": - next_refresh += self.REFRESH_BUILD + next_refresh += self.refresh_config.build elif new_status == "DONE": - next_refresh += self.REFRESH_ACTIVE + if self.refresh_config.active == -1: + next_refresh = -1 + else: + next_refresh += self.refresh_config.active else: - next_refresh += self.REFRESH_ERROR + next_refresh += self.refresh_config.error next_check_at = min(next_check_at, next_refresh) db_ro_task_update["vim_info.refresh_at"] = next_refresh @@ -2212,6 +2288,8 @@ class NsWorker(threading.Thread): if self.logger.getEffectiveLevel() == logging.DEBUG: self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK") """ + # Check if vim status refresh is enabled again + self.update_vm_refresh() # 0: get task_status_create lock_object = None task_status_create = None @@ -2375,11 +2453,9 @@ class NsWorker(threading.Thread): # self._create_task(ro_task, task_index, task_depends, db_ro_task_update) _update_refresh(new_status) else: - if ( - ro_task["vim_info"]["refresh_at"] - and now > ro_task["vim_info"]["refresh_at"] - ): - new_status, db_vim_info_update = self.item2class[ + refresh_at = ro_task["vim_info"]["refresh_at"] + if refresh_at and refresh_at != -1 and now > refresh_at: + (new_status, db_vim_info_update,) = self.item2class[ task["item"] ].refresh(ro_task) _update_refresh(new_status) diff --git a/NG-RO/osm_ng_ro/ro.cfg b/NG-RO/osm_ng_ro/ro.cfg index 6969702a..e232ad1c 100644 --- a/NG-RO/osm_ng_ro/ro.cfg +++ b/NG-RO/osm_ng_ro/ro.cfg @@ -62,6 +62,13 @@ task_locked_time: 300 task_max_locked_time: 1200 # lock is renewed until this maximum time task_relock_time: 15 # 30s before expiring lock time, it is re-locked again +[period] +# use env for OSMRO_PERIOD_XXX +refresh_active: 60 # default 1 min +refresh_build: 15 # default 15 seconds +refresh_image: 3600 * 10 +refresh_error: 600 +queue_size: 100 [database] # use env OSMRO_DATABASE_XXX to override diff --git a/NG-RO/osm_ng_ro/ro_main.py b/NG-RO/osm_ng_ro/ro_main.py index 2a2e22cf..376c087e 100644 --- a/NG-RO/osm_ng_ro/ro_main.py +++ b/NG-RO/osm_ng_ro/ro_main.py @@ -854,7 +854,7 @@ def _start_service(): elif k1 == "tools": # update [/] configuration engine_config["/"]["tools." + k2.replace("_", ".")] = yaml.safe_load(v) - elif k1 in ("message", "database", "storage", "authentication"): + elif k1 in ("message", "database", "storage", "authentication", "period"): engine_config[k1][k2] = yaml.safe_load(v) except Exception as e: diff --git a/NG-RO/osm_ng_ro/tests/test_ns_thread.py b/NG-RO/osm_ng_ro/tests/test_ns_thread.py index 86ba996e..b1209015 100644 --- a/NG-RO/osm_ng_ro/tests/test_ns_thread.py +++ b/NG-RO/osm_ng_ro/tests/test_ns_thread.py @@ -19,7 +19,10 @@ import logging import unittest from unittest.mock import MagicMock, patch +from osm_common.dbmemory import DbMemory from osm_ng_ro.ns_thread import ( + ConfigValidate, + NsWorker, VimInteractionAffinityGroup, VimInteractionMigration, VimInteractionNet, @@ -28,6 +31,233 @@ from osm_ng_ro.ns_thread import ( from osm_ro_plugin.vimconn import VimConnConnectionException, VimConnException +class TestConfigValidate(unittest.TestCase): + def setUp(self): + self.config_dict = { + "period": { + "refresh_active": 65, + "refresh_build": 20, + "refresh_image": 3600, + "refresh_error": 300, + "queue_size": 50, + } + } + + def test__get_configuration(self): + with self.subTest(i=1, t="Get config attributes with config input"): + configuration = ConfigValidate(self.config_dict) + self.assertEqual(configuration.active, 65) + self.assertEqual(configuration.build, 20) + self.assertEqual(configuration.image, 3600) + self.assertEqual(configuration.error, 300) + self.assertEqual(configuration.queue_size, 50) + + with self.subTest(i=2, t="Unallowed refresh active input"): + # > 60 (except -1) is not allowed to set, so it should return default value 60 + self.config_dict["period"]["refresh_active"] = 20 + configuration = ConfigValidate(self.config_dict) + self.assertEqual(configuration.active, 60) + + with self.subTest(i=3, t="Config to disable VM status periodic checks"): + # -1 is allowed to set to disable VM status updates + self.config_dict["period"]["refresh_active"] = -1 + configuration = ConfigValidate(self.config_dict) + self.assertEqual(configuration.active, -1) + + +class TestNsWorker(unittest.TestCase): + def setUp(self): + self.task_depends = None + self.plugins = {} + self.worker_index = "worker-3" + self.config = { + "period": { + "refresh_active": 60, + "refresh_build": 20, + "refresh_image": 3600, + "refresh_error": 600, + "queue_size": 100, + }, + "process_id": "343435353", + "global": {"task_locked_time": 16373242100.994312}, + } + + self.ro_task = { + "_id": "122436:1", + "locked_by": None, + "locked_at": 0.0, + "target_id": "vim_openstack_1", + "vim_info": { + "created": False, + "created_items": None, + "vim_id": "test-vim-id", + "vim_name": "test-vim", + "vim_status": "DONE", + "vim_details": "", + "vim_message": None, + "refresh_at": None, + }, + "modified_at": 1637324200.994312, + "created_at": 1637324200.994312, + "to_check_at": 16373242400.994312, + "tasks": [ + { + "target_id": 0, + "action_id": "123456", + "nsr_id": "654321", + "task_id": "123456:1", + "status": "DONE", + "action": "CREATE", + "item": "test_item", + "target_record": "test_target_record", + "target_record_id": "test_target_record_id", + }, + ], + } + + def get_disabled_tasks(self, db, status): + db_disabled_tasks = db.get_list( + "ro_tasks", + q_filter={ + "tasks.status": status, + "to_check_at.lt": 0, + }, + ) + return db_disabled_tasks + + def test__update_vm_refresh(self): + with self.subTest( + i=1, + t="1 disabled task with status BUILD in DB, refresh_active parameter is not equal to -1", + ): + # Disabled task with status build will not enabled again + db = DbMemory() + self.ro_task["tasks"][0]["status"] = "BUILD" + self.ro_task["to_check_at"] = -1 + db.create("ro_tasks", self.ro_task) + disabled_tasks_count = len(self.get_disabled_tasks(db, "BUILD")) + instance = NsWorker(self.worker_index, self.config, self.plugins, db) + with patch.object(instance, "logger", logging): + instance.update_vm_refresh() + self.assertEqual( + len(self.get_disabled_tasks(db, "BUILD")), disabled_tasks_count + ) + + with self.subTest( + i=2, + t="1 disabled task with status DONE in DB, refresh_active parameter is equal to -1", + ): + # As refresh_active parameter is equal to -1, task will not be enabled to process again + db = DbMemory() + self.config["period"]["refresh_active"] = -1 + self.ro_task["tasks"][0]["status"] = "DONE" + self.ro_task["to_check_at"] = -1 + db.create("ro_tasks", self.ro_task) + disabled_tasks_count = len(self.get_disabled_tasks(db, "DONE")) + instance = NsWorker(self.worker_index, self.config, self.plugins, db) + with patch.object(instance, "logger", logging): + instance.update_vm_refresh() + self.assertEqual( + len(self.get_disabled_tasks(db, "DONE")), disabled_tasks_count + ) + + with self.subTest( + i=3, + t="2 disabled task with status DONE in DB, refresh_active parameter is not equal to -1", + ): + # Disabled tasks should be enabled to process again + db = DbMemory() + self.config["period"]["refresh_active"] = 66 + self.ro_task["tasks"][0]["status"] = "DONE" + self.ro_task["to_check_at"] = -1 + db.create("ro_tasks", self.ro_task) + self.ro_task2 = self.ro_task + self.ro_task2["_id"] = "122437:1" + db.create("ro_tasks", self.ro_task2) + disabled_tasks_count = len(self.get_disabled_tasks(db, "DONE")) + instance = NsWorker(self.worker_index, self.config, self.plugins, db) + with patch.object(instance, "logger", logging): + instance.update_vm_refresh() + self.assertEqual( + len(self.get_disabled_tasks(db, "DONE")), disabled_tasks_count - 2 + ) + + with self.subTest( + i=4, + t="No disabled task with status DONE in DB, refresh_active parameter is not equal to -1", + ): + # If there is not any disabled task, method will not change anything + db = DbMemory() + self.config["period"]["refresh_active"] = 66 + self.ro_task["tasks"][0]["status"] = "DONE" + self.ro_task["to_check_at"] = 16373242400.994312 + db.create("ro_tasks", self.ro_task) + self.ro_task2 = self.ro_task + self.ro_task2["_id"] = "122437:1" + db.create("ro_tasks", self.ro_task2) + disabled_tasks_count = len(self.get_disabled_tasks(db, "DONE")) + instance = NsWorker(self.worker_index, self.config, self.plugins, db) + with patch.object(instance, "logger", logging): + instance.update_vm_refresh() + self.assertEqual( + len(self.get_disabled_tasks(db, "DONE")), disabled_tasks_count + ) + + def test__process_pending_tasks(self): + with self.subTest( + i=1, + t="refresh_active parameter is equal to -1, task status is DONE", + ): + # Task should be disabled to process again + db = DbMemory() + self.config["period"]["refresh_active"] = -1 + self.ro_task["tasks"][0]["status"] = "DONE" + self.ro_task["to_check_at"] = 16373242400.994312 + db.create("ro_tasks", self.ro_task) + # Number of disabled tasks in DB + disabled_tasks_count = len(self.get_disabled_tasks(db, "DONE")) + instance = NsWorker(self.worker_index, self.config, self.plugins, db) + with patch.object(instance, "logger", logging): + instance._process_pending_tasks(self.ro_task) + self.assertEqual( + len(self.get_disabled_tasks(db, "DONE")), disabled_tasks_count + 1 + ) + + with self.subTest( + i=2, t="refresh_active parameter is equal to -1, task status is FAILED" + ): + # Task will not be disabled to process as task status is not DONE + db = DbMemory() + self.config["period"]["refresh_active"] = -1 + self.ro_task["tasks"][0]["status"] = "FAILED" + self.ro_task["to_check_at"] = 16373242400.994312 + db.create("ro_tasks", self.ro_task) + disabled_tasks_count = len(self.get_disabled_tasks(db, "FAILED")) + instance = NsWorker(self.worker_index, self.config, self.plugins, db) + with patch.object(instance, "logger", logging): + instance._process_pending_tasks(self.ro_task) + self.assertEqual( + len(self.get_disabled_tasks(db, "FAILED")), disabled_tasks_count + ) + + with self.subTest( + i=3, t="refresh_active parameter is not equal to -1, task status is DONE" + ): + # Task will not be disabled to process as refresh_active parameter is not -1 + db = DbMemory() + self.config["period"]["refresh_active"] = 70 + self.ro_task["tasks"][0]["status"] = "DONE" + self.ro_task["to_check_at"] = 16373242400.994312 + db.create("ro_tasks", self.ro_task) + disabled_tasks_count = len(self.get_disabled_tasks(db, "DONE")) + instance = NsWorker(self.worker_index, self.config, self.plugins, db) + with patch.object(instance, "logger", logging): + instance._process_pending_tasks(self.ro_task) + self.assertEqual( + len(self.get_disabled_tasks(db, "DONE")), disabled_tasks_count + ) + + class TestVimInteractionNet(unittest.TestCase): def setUp(self): module_name = "osm_ro_plugin" diff --git a/devops-stages/stage-releasenote.sh b/devops-stages/stage-releasenote.sh old mode 100644 new mode 100755 diff --git a/releasenotes/notes/fix_bug_2086-344f52762a42cdb2.yaml b/releasenotes/notes/fix_bug_2086-344f52762a42cdb2.yaml new file mode 100644 index 00000000..186465e5 --- /dev/null +++ b/releasenotes/notes/fix_bug_2086-344f52762a42cdb2.yaml @@ -0,0 +1,25 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### +--- +fixes: + - | + Fix Bug 2086 Updating VNF status configurable + This fix allows to set REFRESH_ACTIVE period as config option which + periodically checks the VM status from VIM. Env variable can be set in the + ro container export OSMRO_PERIOD_REFRESH_ACTIVE=-1 to disable VM status updates. + This config parameter allowed to set >= 60 seconds or -1. + -- 2.25.1