# devops-stages/stage-build.sh
#
-FROM ubuntu:20.04
+FROM ubuntu:22.04
ARG APT_PROXY
RUN if [ ! -z $APT_PROXY ] ; then \
python3 \
python3-all \
python3-dev \
- python3-setuptools
+ python3-setuptools \
+ python3-pip \
+ tox
-RUN python3 -m easy_install pip==21.3.1
-RUN pip install tox==3.24.5
+ENV LC_ALL C.UTF-8
+ENV LANG C.UTF-8
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import asyncio
import json
import logging
import operator
class AlarmingService:
- def __init__(self, config: Config, loop=None):
+ def __init__(self, config: Config):
self.conf = config
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
self.db_client = CommonDbClient(config)
- self.mon_client = MonClient(config, loop=self.loop)
- self.lcm_client = LcmClient(config, loop=self.loop)
+ self.mon_client = MonClient(config)
+ self.lcm_client = LcmClient(config)
async def configure_vnf_alarms(self, nsr_id: str, vnf_member_index=None):
log.info("Configuring vnf alarms for network service %s", nsr_id)
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import asyncio
import datetime
import json
import logging
class AutoscalingService:
- def __init__(self, config: Config, loop=None):
+ def __init__(self, config: Config):
self.conf = config
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
self.db_client = CommonDbClient(config)
- self.mon_client = MonClient(config, loop=self.loop)
- self.lcm_client = LcmClient(config, loop=self.loop)
+ self.mon_client = MonClient(config)
+ self.lcm_client = LcmClient(config)
async def configure_scaling_groups(self, nsr_id: str, vnf_member_index=None):
"""
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
import argparse
-import asyncio
import logging
import sys
import os
db_manager.create_tables()
log.info("Database initialized correctly.")
log.info("Starting policy module agent...")
- loop = asyncio.get_event_loop()
- agent = PolicyModuleAgent(cfg, loop)
+ agent = PolicyModuleAgent(cfg)
agent.run()
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import asyncio
import datetime
import json
import logging
Client to communicate with LCM through the message bus.
"""
- def __init__(self, config: Config, loop=None):
+ def __init__(self, config: Config):
self.db_client = CommonDbClient(config)
self.msg_bus = MessageBusClient(config)
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
async def scale(
self, nsr_id: str, scaling_group_name: str, vnf_member_index: str, action: str
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import asyncio
from typing import List, Callable
from osm_common import msgkafka, msglocal
class MessageBusClient:
- def __init__(self, config: Config, loop=None):
+ def __init__(self, config: Config):
if config.get("message", "driver") == "local":
self.msg_bus = msglocal.MsgLocal()
elif config.get("message", "driver") == "kafka":
"Unknown message bug driver {}".format(config.get("section", "driver"))
)
self.msg_bus.connect(config.get("message"))
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
async def aioread(self, topics: List[str], callback: Callable = None, **kwargs):
"""
:param kwargs: Keyword arguments to be passed to callback function.
:return: None
"""
- await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs)
+ await self.msg_bus.aioread(topics, aiocallback=callback, **kwargs)
async def aiowrite(self, topic: str, key: str, msg: dict):
"""
:param msg: Dictionary containing message to be written.
:return: None
"""
- await self.msg_bus.aiowrite(topic, key, msg, self.loop)
+ await self.msg_bus.aiowrite(topic, key, msg)
async def aioread_once(self, topic: str):
"""
:param topic: topic to retrieve message from.
:return: tuple(topic, key, message)
"""
- result = await self.msg_bus.aioread(topic, self.loop)
+ result = await self.msg_bus.aioread(topic)
return result
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import asyncio
import json
import logging
import random
class MonClient:
- def __init__(self, config: Config, loop=None):
+ def __init__(self, config: Config):
self.kafka_server = "{}:{}".format(
config.get("message", "host"), config.get("message", "port")
)
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
async def create_alarm(
self,
)
log.debug("Sending create_alarm_request %s", msg)
producer = AIOKafkaProducer(
- loop=self.loop,
bootstrap_servers=self.kafka_server,
key_serializer=str.encode,
value_serializer=str.encode,
log.debug("Waiting for create_alarm_response...")
consumer = AIOKafkaConsumer(
"alarm_response_" + str(cor_id),
- loop=self.loop,
bootstrap_servers=self.kafka_server,
key_deserializer=bytes.decode,
value_deserializer=bytes.decode,
)
log.debug("Sending delete_alarm_request %s", msg)
producer = AIOKafkaProducer(
- loop=self.loop,
bootstrap_servers=self.kafka_server,
key_serializer=str.encode,
value_serializer=str.encode,
log.debug("Waiting for delete_alarm_response...")
consumer = AIOKafkaConsumer(
"alarm_response_" + str(cor_id),
- loop=self.loop,
bootstrap_servers=self.kafka_server,
key_deserializer=bytes.decode,
value_deserializer=bytes.decode,
class PolicyModuleAgent:
- def __init__(self, config: Config, loop=None):
+ def __init__(self, config: Config):
self.conf = config
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
self.msg_bus = MessageBusClient(config)
self.db_client = CommonDbClient(config)
- self.autoscaling_service = AutoscalingService(config, loop)
- self.alarming_service = AlarmingService(config, loop)
- self.healing_service = HealingService(config, loop)
+ self.autoscaling_service = AutoscalingService(config)
+ self.alarming_service = AlarmingService(config)
+ self.healing_service = HealingService(config)
def run(self):
- self.loop.run_until_complete(self.start())
+ asyncio.run(self.start())
async def start(self):
Path("/tmp/osm_pol_agent_health_flag").touch()
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import asyncio
import logging
import datetime
class HealingService:
- def __init__(self, config: Config, loop=None):
+ def __init__(self, config: Config):
"""
Initializing the HealingService
"""
log.info("HealingService Initialized")
self.conf = config
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
self.db_client = CommonDbClient(config)
- self.mon_client = MonClient(config, loop=self.loop)
- self.lcm_client = LcmClient(config, loop=self.loop)
+ self.mon_client = MonClient(config)
+ self.lcm_client = LcmClient(config)
log.info("Constructor created for HealingService")
async def configure_healing_alarms(self, nsr_id: str):
create_alarm.side_effect = _test_configure_scaling_groups_create_alarm
create_alarm.assert_not_called_with = assert_not_called_with
config = Config()
- agent = PolicyModuleAgent(config, self.loop)
+ agent = PolicyModuleAgent(config)
self.loop.run_until_complete(
agent.autoscaling_service.configure_scaling_groups("test_nsr_id")
)
get_vnfd.return_value = vnfd_record_mock
create_alarm.side_effect = _test_configure_vnf_alarms_create_alarm
config = Config()
- agent = PolicyModuleAgent(config, self.loop)
+ agent = PolicyModuleAgent(config)
self.loop.run_until_complete(
agent.alarming_service.configure_vnf_alarms("test_nsr_id")
)
get_vnfd.return_value = vnfd_record_mock
create_alarm.side_effect = _test_configure_healing_alarms_create_alarm
config = Config()
- agent = PolicyModuleAgent(config, self.loop)
+ agent = PolicyModuleAgent(config)
self.loop.run_until_complete(
agent.healing_service.configure_healing_alarms("test_nsr_id")
)
class TestAlarmingService(TestCase):
def setUp(self):
self.config = Config()
- self.loop = asyncio.new_event_loop()
self.payload = {"notify_details": {"alarm_number": 0}}
self.headers = {"content-type": "application/json"}
get_alarm.return_value = mock_alarm
service = AlarmingService(self.config)
if bool(self.config.get("alert", "enhanced_alarms")):
- self.loop.run_until_complete(
- service.handle_alarm("test_id", "alarm", self.payload)
- )
+ asyncio.run(service.handle_alarm("test_id", "alarm", self.payload))
requests_post.assert_called_once_with(
url="http://alarm-url/",
data='{"notify_details": {"alarm_number": 1}}',
timeout=alert_timeout,
)
else:
- self.loop.run_until_complete(service.handle_alarm("test_id", "alarm", {}))
+ asyncio.run(service.handle_alarm("test_id", "alarm", {}))
requests_post.assert_called_once_with(
json="{}", url="http://alarm-url/", timeout=alert_timeout
)
get_alarm.return_value = mock_alarm
service = AlarmingService(self.config)
if bool(self.config.get("alert", "enhanced_alarms")):
- self.loop.run_until_complete(
- service.handle_alarm("test_id", "ok", self.payload)
- )
+ asyncio.run(service.handle_alarm("test_id", "ok", self.payload))
requests_post.assert_called_once_with(
url="http://ok-url/",
data='{"notify_details": {"alarm_number": 0}}',
timeout=alert_timeout,
)
else:
- self.loop.run_until_complete(service.handle_alarm("test_id", "ok", {}))
+ asyncio.run(service.handle_alarm("test_id", "ok", {}))
requests_post.assert_called_once_with(
json="{}", url="http://ok-url/", timeout=alert_timeout
)
get_alarm.return_value = mock_alarm
service = AlarmingService(self.config)
if bool(self.config.get("alert", "enhanced_alarms")):
- self.loop.run_until_complete(
+ asyncio.run(
service.handle_alarm("test_id", "insufficient-data", self.payload)
)
requests_post.assert_called_once_with(
timeout=alert_timeout,
)
else:
- self.loop.run_until_complete(
- service.handle_alarm("test_id", "insufficient-data", {})
- )
+ asyncio.run(service.handle_alarm("test_id", "insufficient-data", {}))
requests_post.assert_called_once_with(
json="{}", url="http://insufficient-data-url/", timeout=alert_timeout
)
mock_alarm = self._build_mock_alarm("test_id")
get_alarm.return_value = mock_alarm
service = AlarmingService(self.config)
- self.loop.run_until_complete(service.handle_alarm("test_id", "unknown", {}))
+ asyncio.run(service.handle_alarm("test_id", "unknown", {}))
requests_post.assert_not_called()
def _build_mock_alarm(
class TestAutoscalingService(TestCase):
def setUp(self):
self.config = Config()
- self.loop = asyncio.new_event_loop()
@mock.patch.object(ScalingAlarmRepository, "get")
@mock.patch("osm_policy_module.core.database.db")
get_alarm.return_value = mock_alarm
service = AutoscalingService(self.config)
- self.loop.run_until_complete(service.update_alarm_status("test_uuid", "alarm"))
+ asyncio.run(service.update_alarm_status("test_uuid", "alarm"))
self.assertEqual(mock_alarm.last_status, "alarm")
mock_alarm.save.assert_called_with()
service = AutoscalingService(self.config)
- self.loop.run_until_complete(service.update_alarm_status("test_uuid", "ok"))
+ asyncio.run(service.update_alarm_status("test_uuid", "ok"))
self.assertEqual(mock_alarm.last_status, "ok")
mock_alarm.save.assert_called_with()
service = AutoscalingService(self.config)
- self.loop.run_until_complete(
- service.update_alarm_status("test_uuid", "insufficient_data")
- )
+ asyncio.run(service.update_alarm_status("test_uuid", "insufficient_data"))
self.assertEqual(mock_alarm.last_status, "insufficient_data")
mock_alarm.save.assert_called_with()
get_alarm.return_value = mock_alarm
service = AutoscalingService(self.config)
- self.loop.run_until_complete(service.evaluate_policy("test_uuid"))
+ asyncio.run(service.evaluate_policy("test_uuid"))
list_alarms.assert_not_called()
@mock.patch.object(ScalingAlarmRepository, "list")
"""
Tests scale in with AND operation, both alarms triggered
"""
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
scale.return_value = future
list_alarms.return_value = [mock_alarm, mock_alarm_2]
service = AutoscalingService(self.config)
- self.loop.run_until_complete(service.evaluate_policy("test_uuid"))
+ asyncio.run(service.evaluate_policy("test_uuid"))
scale.assert_called_with("test_nsr_id", "test_group", "1", "scale_in")
@mock.patch.object(ScalingAlarmRepository, "list")
"""
Tests scale in with AND operation, only one alarm triggered.
"""
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
scale.return_value = future
list_alarms.return_value = [mock_alarm, mock_alarm_2]
service = AutoscalingService(self.config)
- self.loop.run_until_complete(service.evaluate_policy("test_uuid"))
+ asyncio.run(service.evaluate_policy("test_uuid"))
scale.assert_not_called()
@mock.patch.object(ScalingAlarmRepository, "list")
"""
Tests scale in with OR operation, both alarms triggered
"""
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
scale.return_value = future
list_alarms.return_value = [mock_alarm, mock_alarm_2]
service = AutoscalingService(self.config)
- self.loop.run_until_complete(service.evaluate_policy("test_uuid"))
+ asyncio.run(service.evaluate_policy("test_uuid"))
scale.assert_called_with("test_nsr_id", "test_group", "1", "scale_in")
@mock.patch.object(ScalingAlarmRepository, "list")
"""
Tests scale in with OR operation, only one alarm triggered
"""
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
scale.return_value = future
list_alarms.return_value = [mock_alarm, mock_alarm_2]
service = AutoscalingService(self.config)
- self.loop.run_until_complete(service.evaluate_policy("test_uuid"))
+ asyncio.run(service.evaluate_policy("test_uuid"))
scale.assert_called_with("test_nsr_id", "test_group", "1", "scale_in")
@mock.patch.object(ScalingAlarmRepository, "list")
"""
Tests scale out with AND operation, both alarms triggered
"""
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
scale.return_value = future
list_alarms.return_value = [mock_alarm, mock_alarm_2]
service = AutoscalingService(self.config)
- self.loop.run_until_complete(service.evaluate_policy("test_uuid"))
+ asyncio.run(service.evaluate_policy("test_uuid"))
scale.assert_called_with("test_nsr_id", "test_group", "1", "scale_out")
@mock.patch.object(ScalingAlarmRepository, "list")
"""
Tests scale out with AND operation, only one alarm triggered.
"""
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
scale.return_value = future
list_alarms.return_value = [mock_alarm, mock_alarm_2]
service = AutoscalingService(self.config)
- self.loop.run_until_complete(service.evaluate_policy("test_uuid"))
+ asyncio.run(service.evaluate_policy("test_uuid"))
scale.assert_not_called()
@mock.patch.object(ScalingAlarmRepository, "list")
"""
Tests scale out with OR operation, both alarms triggered
"""
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
scale.return_value = future
list_alarms.return_value = [mock_alarm, mock_alarm_2]
service = AutoscalingService(self.config)
- self.loop.run_until_complete(service.evaluate_policy("test_uuid"))
+ asyncio.run(service.evaluate_policy("test_uuid"))
scale.assert_called_with("test_nsr_id", "test_group", "1", "scale_out")
@mock.patch.object(ScalingAlarmRepository, "list")
"""
Tests scale out with OR operation, only one alarm triggered
"""
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
scale.return_value = future
list_alarms.return_value = [mock_alarm, mock_alarm_2]
service = AutoscalingService(self.config)
- self.loop.run_until_complete(service.evaluate_policy("test_uuid"))
+ asyncio.run(service.evaluate_policy("test_uuid"))
scale.assert_called_with("test_nsr_id", "test_group", "1", "scale_out")
def _build_mock_alarm(
def setUp(self):
self.config = Config()
self.config.set("message", "driver", "kafka")
- self.loop = asyncio.new_event_loop()
@mock.patch.object(MsgKafka, "aioread")
def test_aioread(self, aioread):
async def mock_callback():
pass
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
aioread.return_value = future
- msg_bus = MessageBusClient(self.config, loop=self.loop)
+ msg_bus = MessageBusClient(self.config)
topic = "test_topic"
- self.loop.run_until_complete(msg_bus.aioread([topic], mock_callback))
- aioread.assert_called_with(["test_topic"], self.loop, aiocallback=mock_callback)
+ asyncio.run(msg_bus.aioread([topic], mock_callback))
+ aioread.assert_called_with(["test_topic"], aiocallback=mock_callback)
@mock.patch.object(MsgKafka, "aiowrite")
def test_aiowrite(self, aiowrite):
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
aiowrite.return_value = future
- msg_bus = MessageBusClient(self.config, loop=self.loop)
+ msg_bus = MessageBusClient(self.config)
topic = "test_topic"
key = "test_key"
msg = {"test": "test_msg"}
- self.loop.run_until_complete(msg_bus.aiowrite(topic, key, msg))
- aiowrite.assert_called_with(topic, key, msg, self.loop)
+ asyncio.run(msg_bus.aiowrite(topic, key, msg))
+ aiowrite.assert_called_with(topic, key, msg)
@mock.patch.object(MsgKafka, "aioread")
def test_aioread_once(self, aioread):
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
aioread.return_value = future
- msg_bus = MessageBusClient(self.config, loop=self.loop)
+ msg_bus = MessageBusClient(self.config)
topic = "test_topic"
- self.loop.run_until_complete(msg_bus.aioread_once(topic))
- aioread.assert_called_with("test_topic", self.loop)
+ asyncio.run(msg_bus.aioread_once(topic))
+ aioread.assert_called_with("test_topic")
pass
config = Config()
- agent = PolicyModuleAgent(config, self.loop)
+ agent = PolicyModuleAgent(config)
assert autoscaling_lcm_client.called
assert autoscaling_mon_client.called
assert alarming_lcm_client.called
pass
config = Config()
- agent = PolicyModuleAgent(config, self.loop)
+ agent = PolicyModuleAgent(config)
assert autoscaling_lcm_client.called
assert autoscaling_mon_client.called
assert alarming_lcm_client.called
pass
config = Config()
- agent = PolicyModuleAgent(config, self.loop)
+ agent = PolicyModuleAgent(config)
assert autoscaling_lcm_client.called
assert autoscaling_mon_client.called
assert alarming_lcm_client.called
pass
config = Config()
- agent = PolicyModuleAgent(config, self.loop)
+ agent = PolicyModuleAgent(config)
assert autoscaling_lcm_client.called
assert autoscaling_mon_client.called
assert alarming_lcm_client.called
# aiokafka
dataclasses==0.6
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+dnspython==2.3.0
+ # via
+ # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+ # pymongo
kafka-python==2.0.2
# via
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
# aiokafka
-motor==1.3.1
+motor==3.1.2
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git@master
# via -r requirements-dev.in
-packaging==23.0
+packaging==23.1
# via
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
# aiokafka
pycryptodome==3.17
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
-pymongo==3.13.0
+pymongo==4.3.3
# via
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
# motor
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
-coverage==7.1.0
+coverage==7.2.5
# via -r requirements-test.in
-mock==5.0.1
+mock==5.0.2
# via -r requirements-test.in
-nose2==0.12.0
+nose2==0.13.0
# via -r requirements-test.in
# limitations under the License.
aiokafka
-peewee==3.8.*
-jsonschema==2.6.*
+peewee
+jsonschema
pyyaml==5.4.1
-pymysql==0.9.*
-peewee-migrate==1.1.*
-requests==2.*
+pymysql
+peewee-migrate
+requests
# via -r requirements.in
async-timeout==4.0.2
# via aiokafka
-cached-property==1.5.2
- # via peewee-migrate
-certifi==2022.12.7
+attrs==23.1.0
+ # via jsonschema
+certifi==2023.5.7
# via requests
-charset-normalizer==3.0.1
+charset-normalizer==3.1.0
# via requests
click==8.1.3
# via peewee-migrate
idna==3.4
# via requests
-jsonschema==2.6.0
+jsonschema==4.17.3
# via -r requirements.in
kafka-python==2.0.2
# via aiokafka
-mock==5.0.1
- # via peewee-migrate
-packaging==23.0
+packaging==23.1
# via aiokafka
-peewee==3.8.2
+peewee==3.16.2
# via
# -r requirements.in
# peewee-migrate
-peewee-migrate==1.1.6
+peewee-migrate==1.7.1
# via -r requirements.in
-pymysql==0.9.3
+pymysql==1.0.3
# via -r requirements.in
+pyrsistent==0.19.3
+ # via jsonschema
pyyaml==5.4.1
# via -r requirements.in
-requests==2.28.2
+requests==2.30.0
# via -r requirements.in
-urllib3==1.26.14
+urllib3==2.0.2
# via requests
[testenv]
usedevelop = True
-basepython = python3.8
+basepython = python3.10
setenv = VIRTUAL_ENV={envdir}
PYTHONDONTWRITEBYTECODE = 1
deps = -r{toxinidir}/requirements.txt