Fix Bug 2086 Updating VNF status configurable
This fix allows to set REFRESH_ACTIVE period as config option which
periodically checks the VM status from VIM. Env variable can be set in the
ro container export OSMRO_PERIOD_REFRESH_ACTIVE=-1 to disable VM status updates.
This config parameter allowed to set >= 60 seconds or -1.
Making the stage-releasenotes.sh executable and fix the syntax errors in stage-build.sh.
Change-Id: I90a859d9b831c1ce24faea4eab12dc998ac655d5
Signed-off-by: aticig <gulsum.atici@canonical.com>
diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py
index 8cd7d2c..7656a54 100644
--- a/NG-RO/osm_ng_ro/ns_thread.py
+++ b/NG-RO/osm_ng_ro/ns_thread.py
@@ -33,6 +33,7 @@
import threading
import time
import traceback
+from typing import Dict
from unittest.mock import Mock
from importlib_metadata import entry_points
@@ -1472,18 +1473,41 @@
return "FAILED", ro_vim_item_update, db_task_update
-class NsWorker(threading.Thread):
- REFRESH_BUILD = 5 # 5 seconds
- REFRESH_ACTIVE = 60 # 1 minute
- REFRESH_ERROR = 600
- REFRESH_IMAGE = 3600 * 10
- REFRESH_DELETE = 3600 * 10
- QUEUE_SIZE = 100
- terminate = False
+class ConfigValidate:
+ def __init__(self, config: Dict):
+ self.conf = config
+ @property
+ def active(self):
+ # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
+ if (
+ self.conf["period"]["refresh_active"] >= 60
+ or self.conf["period"]["refresh_active"] == -1
+ ):
+ return self.conf["period"]["refresh_active"]
+
+ return 60
+
+ @property
+ def build(self):
+ return self.conf["period"]["refresh_build"]
+
+ @property
+ def image(self):
+ return self.conf["period"]["refresh_image"]
+
+ @property
+ def error(self):
+ return self.conf["period"]["refresh_error"]
+
+ @property
+ def queue_size(self):
+ return self.conf["period"]["queue_size"]
+
+
+class NsWorker(threading.Thread):
def __init__(self, worker_index, config, plugins, db):
"""
-
:param worker_index: thread index
:param config: general configuration of RO, among others the process_id with the docker id where it runs
:param plugins: global shared dict with the loaded plugins
@@ -1495,7 +1519,9 @@
self.plugin_name = "unknown"
self.logger = logging.getLogger("ro.worker{}".format(worker_index))
self.worker_index = worker_index
- self.task_queue = queue.Queue(self.QUEUE_SIZE)
+ # refresh periods for created items
+ self.refresh_config = ConfigValidate(config)
+ self.task_queue = queue.Queue(self.refresh_config.queue_size)
# targetvim: vimplugin class
self.my_vims = {}
# targetvim: vim information from database
@@ -1875,6 +1901,7 @@
"tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
"locked_at.lt": now - self.task_locked_time,
"to_check_at.lt": self.time_last_task_processed,
+ "to_check_at.gt": -1,
},
update_dict={"locked_by": self.my_id, "locked_at": now},
fail_on_empty=False,
@@ -2177,6 +2204,52 @@
return ro_task_dependency, task_index
raise NsWorkerException("Cannot get depending task {}".format(task_id))
+ def update_vm_refresh(self):
+ """Enables the VM status updates if self.refresh_config.active parameter
+ is not -1 and than updates the DB accordingly
+
+ """
+ try:
+ self.logger.debug("Checking if VM status update config")
+ next_refresh = time.time()
+ if self.refresh_config.active == -1:
+ next_refresh = -1
+ else:
+ next_refresh += self.refresh_config.active
+
+ if next_refresh != -1:
+ db_ro_task_update = {}
+ now = time.time()
+ next_check_at = now + (24 * 60 * 60)
+ next_check_at = min(next_check_at, next_refresh)
+ db_ro_task_update["vim_info.refresh_at"] = next_refresh
+ db_ro_task_update["to_check_at"] = next_check_at
+
+ self.logger.debug(
+ "Finding tasks which to be updated to enable VM status updates"
+ )
+ refresh_tasks = self.db.get_list(
+ "ro_tasks",
+ q_filter={
+ "tasks.status": "DONE",
+ "to_check_at.lt": 0,
+ },
+ )
+ self.logger.debug("Updating tasks to change the to_check_at status")
+ for task in refresh_tasks:
+ q_filter = {
+ "_id": task["_id"],
+ }
+ self.db.set_one(
+ "ro_tasks",
+ q_filter=q_filter,
+ update_dict=db_ro_task_update,
+ fail_on_empty=True,
+ )
+
+ except Exception as e:
+ self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
+
def _process_pending_tasks(self, ro_task):
ro_task_id = ro_task["_id"]
now = time.time()
@@ -2194,13 +2267,16 @@
next_refresh = time.time()
if task["item"] in ("image", "flavor"):
- next_refresh += self.REFRESH_IMAGE
+ next_refresh += self.refresh_config.image
elif new_status == "BUILD":
- next_refresh += self.REFRESH_BUILD
+ next_refresh += self.refresh_config.build
elif new_status == "DONE":
- next_refresh += self.REFRESH_ACTIVE
+ if self.refresh_config.active == -1:
+ next_refresh = -1
+ else:
+ next_refresh += self.refresh_config.active
else:
- next_refresh += self.REFRESH_ERROR
+ next_refresh += self.refresh_config.error
next_check_at = min(next_check_at, next_refresh)
db_ro_task_update["vim_info.refresh_at"] = next_refresh
@@ -2212,6 +2288,8 @@
if self.logger.getEffectiveLevel() == logging.DEBUG:
self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
"""
+ # Check if vim status refresh is enabled again
+ self.update_vm_refresh()
# 0: get task_status_create
lock_object = None
task_status_create = None
@@ -2375,11 +2453,9 @@
# self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
_update_refresh(new_status)
else:
- if (
- ro_task["vim_info"]["refresh_at"]
- and now > ro_task["vim_info"]["refresh_at"]
- ):
- new_status, db_vim_info_update = self.item2class[
+ refresh_at = ro_task["vim_info"]["refresh_at"]
+ if refresh_at and refresh_at != -1 and now > refresh_at:
+ (new_status, db_vim_info_update,) = self.item2class[
task["item"]
].refresh(ro_task)
_update_refresh(new_status)
diff --git a/NG-RO/osm_ng_ro/ro.cfg b/NG-RO/osm_ng_ro/ro.cfg
index 6969702..e232ad1 100644
--- a/NG-RO/osm_ng_ro/ro.cfg
+++ b/NG-RO/osm_ng_ro/ro.cfg
@@ -62,6 +62,13 @@
task_max_locked_time: 1200 # lock is renewed until this maximum time
task_relock_time: 15 # 30s before expiring lock time, it is re-locked again
+[period]
+# use env for OSMRO_PERIOD_XXX
+refresh_active: 60 # default 1 min
+refresh_build: 15 # default 15 seconds
+refresh_image: 3600 * 10
+refresh_error: 600
+queue_size: 100
[database]
# use env OSMRO_DATABASE_XXX to override
diff --git a/NG-RO/osm_ng_ro/ro_main.py b/NG-RO/osm_ng_ro/ro_main.py
index 2a2e22c..376c087 100644
--- a/NG-RO/osm_ng_ro/ro_main.py
+++ b/NG-RO/osm_ng_ro/ro_main.py
@@ -854,7 +854,7 @@
elif k1 == "tools":
# update [/] configuration
engine_config["/"]["tools." + k2.replace("_", ".")] = yaml.safe_load(v)
- elif k1 in ("message", "database", "storage", "authentication"):
+ elif k1 in ("message", "database", "storage", "authentication", "period"):
engine_config[k1][k2] = yaml.safe_load(v)
except Exception as e:
diff --git a/NG-RO/osm_ng_ro/tests/test_ns_thread.py b/NG-RO/osm_ng_ro/tests/test_ns_thread.py
index 86ba996..b120901 100644
--- a/NG-RO/osm_ng_ro/tests/test_ns_thread.py
+++ b/NG-RO/osm_ng_ro/tests/test_ns_thread.py
@@ -19,7 +19,10 @@
import unittest
from unittest.mock import MagicMock, patch
+from osm_common.dbmemory import DbMemory
from osm_ng_ro.ns_thread import (
+ ConfigValidate,
+ NsWorker,
VimInteractionAffinityGroup,
VimInteractionMigration,
VimInteractionNet,
@@ -28,6 +31,233 @@
from osm_ro_plugin.vimconn import VimConnConnectionException, VimConnException
+class TestConfigValidate(unittest.TestCase):
+ def setUp(self):
+ self.config_dict = {
+ "period": {
+ "refresh_active": 65,
+ "refresh_build": 20,
+ "refresh_image": 3600,
+ "refresh_error": 300,
+ "queue_size": 50,
+ }
+ }
+
+ def test__get_configuration(self):
+ with self.subTest(i=1, t="Get config attributes with config input"):
+ configuration = ConfigValidate(self.config_dict)
+ self.assertEqual(configuration.active, 65)
+ self.assertEqual(configuration.build, 20)
+ self.assertEqual(configuration.image, 3600)
+ self.assertEqual(configuration.error, 300)
+ self.assertEqual(configuration.queue_size, 50)
+
+ with self.subTest(i=2, t="Unallowed refresh active input"):
+ # > 60 (except -1) is not allowed to set, so it should return default value 60
+ self.config_dict["period"]["refresh_active"] = 20
+ configuration = ConfigValidate(self.config_dict)
+ self.assertEqual(configuration.active, 60)
+
+ with self.subTest(i=3, t="Config to disable VM status periodic checks"):
+ # -1 is allowed to set to disable VM status updates
+ self.config_dict["period"]["refresh_active"] = -1
+ configuration = ConfigValidate(self.config_dict)
+ self.assertEqual(configuration.active, -1)
+
+
+class TestNsWorker(unittest.TestCase):
+ def setUp(self):
+ self.task_depends = None
+ self.plugins = {}
+ self.worker_index = "worker-3"
+ self.config = {
+ "period": {
+ "refresh_active": 60,
+ "refresh_build": 20,
+ "refresh_image": 3600,
+ "refresh_error": 600,
+ "queue_size": 100,
+ },
+ "process_id": "343435353",
+ "global": {"task_locked_time": 16373242100.994312},
+ }
+
+ self.ro_task = {
+ "_id": "122436:1",
+ "locked_by": None,
+ "locked_at": 0.0,
+ "target_id": "vim_openstack_1",
+ "vim_info": {
+ "created": False,
+ "created_items": None,
+ "vim_id": "test-vim-id",
+ "vim_name": "test-vim",
+ "vim_status": "DONE",
+ "vim_details": "",
+ "vim_message": None,
+ "refresh_at": None,
+ },
+ "modified_at": 1637324200.994312,
+ "created_at": 1637324200.994312,
+ "to_check_at": 16373242400.994312,
+ "tasks": [
+ {
+ "target_id": 0,
+ "action_id": "123456",
+ "nsr_id": "654321",
+ "task_id": "123456:1",
+ "status": "DONE",
+ "action": "CREATE",
+ "item": "test_item",
+ "target_record": "test_target_record",
+ "target_record_id": "test_target_record_id",
+ },
+ ],
+ }
+
+ def get_disabled_tasks(self, db, status):
+ db_disabled_tasks = db.get_list(
+ "ro_tasks",
+ q_filter={
+ "tasks.status": status,
+ "to_check_at.lt": 0,
+ },
+ )
+ return db_disabled_tasks
+
+ def test__update_vm_refresh(self):
+ with self.subTest(
+ i=1,
+ t="1 disabled task with status BUILD in DB, refresh_active parameter is not equal to -1",
+ ):
+ # Disabled task with status build will not enabled again
+ db = DbMemory()
+ self.ro_task["tasks"][0]["status"] = "BUILD"
+ self.ro_task["to_check_at"] = -1
+ db.create("ro_tasks", self.ro_task)
+ disabled_tasks_count = len(self.get_disabled_tasks(db, "BUILD"))
+ instance = NsWorker(self.worker_index, self.config, self.plugins, db)
+ with patch.object(instance, "logger", logging):
+ instance.update_vm_refresh()
+ self.assertEqual(
+ len(self.get_disabled_tasks(db, "BUILD")), disabled_tasks_count
+ )
+
+ with self.subTest(
+ i=2,
+ t="1 disabled task with status DONE in DB, refresh_active parameter is equal to -1",
+ ):
+ # As refresh_active parameter is equal to -1, task will not be enabled to process again
+ db = DbMemory()
+ self.config["period"]["refresh_active"] = -1
+ self.ro_task["tasks"][0]["status"] = "DONE"
+ self.ro_task["to_check_at"] = -1
+ db.create("ro_tasks", self.ro_task)
+ disabled_tasks_count = len(self.get_disabled_tasks(db, "DONE"))
+ instance = NsWorker(self.worker_index, self.config, self.plugins, db)
+ with patch.object(instance, "logger", logging):
+ instance.update_vm_refresh()
+ self.assertEqual(
+ len(self.get_disabled_tasks(db, "DONE")), disabled_tasks_count
+ )
+
+ with self.subTest(
+ i=3,
+ t="2 disabled task with status DONE in DB, refresh_active parameter is not equal to -1",
+ ):
+ # Disabled tasks should be enabled to process again
+ db = DbMemory()
+ self.config["period"]["refresh_active"] = 66
+ self.ro_task["tasks"][0]["status"] = "DONE"
+ self.ro_task["to_check_at"] = -1
+ db.create("ro_tasks", self.ro_task)
+ self.ro_task2 = self.ro_task
+ self.ro_task2["_id"] = "122437:1"
+ db.create("ro_tasks", self.ro_task2)
+ disabled_tasks_count = len(self.get_disabled_tasks(db, "DONE"))
+ instance = NsWorker(self.worker_index, self.config, self.plugins, db)
+ with patch.object(instance, "logger", logging):
+ instance.update_vm_refresh()
+ self.assertEqual(
+ len(self.get_disabled_tasks(db, "DONE")), disabled_tasks_count - 2
+ )
+
+ with self.subTest(
+ i=4,
+ t="No disabled task with status DONE in DB, refresh_active parameter is not equal to -1",
+ ):
+ # If there is not any disabled task, method will not change anything
+ db = DbMemory()
+ self.config["period"]["refresh_active"] = 66
+ self.ro_task["tasks"][0]["status"] = "DONE"
+ self.ro_task["to_check_at"] = 16373242400.994312
+ db.create("ro_tasks", self.ro_task)
+ self.ro_task2 = self.ro_task
+ self.ro_task2["_id"] = "122437:1"
+ db.create("ro_tasks", self.ro_task2)
+ disabled_tasks_count = len(self.get_disabled_tasks(db, "DONE"))
+ instance = NsWorker(self.worker_index, self.config, self.plugins, db)
+ with patch.object(instance, "logger", logging):
+ instance.update_vm_refresh()
+ self.assertEqual(
+ len(self.get_disabled_tasks(db, "DONE")), disabled_tasks_count
+ )
+
+ def test__process_pending_tasks(self):
+ with self.subTest(
+ i=1,
+ t="refresh_active parameter is equal to -1, task status is DONE",
+ ):
+ # Task should be disabled to process again
+ db = DbMemory()
+ self.config["period"]["refresh_active"] = -1
+ self.ro_task["tasks"][0]["status"] = "DONE"
+ self.ro_task["to_check_at"] = 16373242400.994312
+ db.create("ro_tasks", self.ro_task)
+ # Number of disabled tasks in DB
+ disabled_tasks_count = len(self.get_disabled_tasks(db, "DONE"))
+ instance = NsWorker(self.worker_index, self.config, self.plugins, db)
+ with patch.object(instance, "logger", logging):
+ instance._process_pending_tasks(self.ro_task)
+ self.assertEqual(
+ len(self.get_disabled_tasks(db, "DONE")), disabled_tasks_count + 1
+ )
+
+ with self.subTest(
+ i=2, t="refresh_active parameter is equal to -1, task status is FAILED"
+ ):
+ # Task will not be disabled to process as task status is not DONE
+ db = DbMemory()
+ self.config["period"]["refresh_active"] = -1
+ self.ro_task["tasks"][0]["status"] = "FAILED"
+ self.ro_task["to_check_at"] = 16373242400.994312
+ db.create("ro_tasks", self.ro_task)
+ disabled_tasks_count = len(self.get_disabled_tasks(db, "FAILED"))
+ instance = NsWorker(self.worker_index, self.config, self.plugins, db)
+ with patch.object(instance, "logger", logging):
+ instance._process_pending_tasks(self.ro_task)
+ self.assertEqual(
+ len(self.get_disabled_tasks(db, "FAILED")), disabled_tasks_count
+ )
+
+ with self.subTest(
+ i=3, t="refresh_active parameter is not equal to -1, task status is DONE"
+ ):
+ # Task will not be disabled to process as refresh_active parameter is not -1
+ db = DbMemory()
+ self.config["period"]["refresh_active"] = 70
+ self.ro_task["tasks"][0]["status"] = "DONE"
+ self.ro_task["to_check_at"] = 16373242400.994312
+ db.create("ro_tasks", self.ro_task)
+ disabled_tasks_count = len(self.get_disabled_tasks(db, "DONE"))
+ instance = NsWorker(self.worker_index, self.config, self.plugins, db)
+ with patch.object(instance, "logger", logging):
+ instance._process_pending_tasks(self.ro_task)
+ self.assertEqual(
+ len(self.get_disabled_tasks(db, "DONE")), disabled_tasks_count
+ )
+
+
class TestVimInteractionNet(unittest.TestCase):
def setUp(self):
module_name = "osm_ro_plugin"
diff --git a/devops-stages/stage-build.sh b/devops-stages/stage-build.sh
index 2cd4689..8818289 100755
--- a/devops-stages/stage-build.sh
+++ b/devops-stages/stage-build.sh
@@ -37,14 +37,9 @@
dist_ro_vim_gcp"
TOX_ENV_LIST="$(echo $PACKAGES | sed "s/ /,/g")"
-<<<<<<< ours
-
-TOX_PARALLEL_NO_SPINNER=1 tox -e $TOX_ENV_LIST --parallel auto
-=======
PROCESSES=$(expr `nproc --a` / 2)
TOX_PARALLEL_NO_SPINNER=1 tox -e $TOX_ENV_LIST --parallel $PROCESSES
->>>>>>> theirs
# Copying packages
# RO plugin
diff --git a/devops-stages/stage-releasenote.sh b/devops-stages/stage-releasenote.sh
old mode 100644
new mode 100755
diff --git a/releasenotes/notes/fix_bug_2086-344f52762a42cdb2.yaml b/releasenotes/notes/fix_bug_2086-344f52762a42cdb2.yaml
new file mode 100644
index 0000000..186465e
--- /dev/null
+++ b/releasenotes/notes/fix_bug_2086-344f52762a42cdb2.yaml
@@ -0,0 +1,25 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+---
+fixes:
+ - |
+ Fix Bug 2086 Updating VNF status configurable
+ This fix allows to set REFRESH_ACTIVE period as config option which
+ periodically checks the VM status from VIM. Env variable can be set in the
+ ro container export OSMRO_PERIOD_REFRESH_ACTIVE=-1 to disable VM status updates.
+ This config parameter allowed to set >= 60 seconds or -1.
+