# devops-stages/stage-build.sh
#
-FROM ubuntu:20.04
+FROM ubuntu:22.04
ARG APT_PROXY
RUN if [ ! -z $APT_PROXY ] ; then \
python3 \
python3-all \
python3-dev \
- python3-setuptools
+ python3-setuptools \
+ python3-pip
-RUN python3 -m easy_install pip==21.3.1
-RUN pip install tox==3.24.5
+RUN python3 -m pip install -U pip==23.1
+RUN pip install tox
ENV LC_ALL C.UTF-8
ENV LANG C.UTF-8
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import asyncio
+
from base64 import b64decode, b64encode
from copy import deepcopy
from http import HTTPStatus
class Encryption(DbBase):
- def __init__(self, uri, config, encoding_type="ascii", loop=None, logger_name="db"):
+ def __init__(self, uri, config, encoding_type="ascii", logger_name="db"):
"""Constructor.
Args:
uri (str): Connection string to connect to the database.
config (dict): Additional database info
encoding_type (str): ascii, utf-8 etc.
- loop (object): Asyncio Loop
logger_name (str): Logger name
"""
- self.loop = loop or asyncio.get_event_loop()
self._secret_key = None # 32 bytes length array used for encrypt/decrypt
self.encrypt_mode = AES.MODE_ECB
super(Encryption, self).__init__(
"Method 'read' not implemented", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
)
- async def aiowrite(self, topic, key, msg, loop=None):
+ async def aiowrite(self, topic, key, msg):
raise MsgException(
"Method 'aiowrite' not implemented",
http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
)
async def aioread(
- self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs
+ self, topic, callback=None, aiocallback=None, group_id=None, **kwargs
):
raise MsgException(
"Method 'aioread' not implemented",
self.port = None
self.consumer = None
self.producer = None
- self.loop = None
self.broker = None
self.group_id = None
self.logger = logging.getLogger(config["logger_name"])
self.host = config["host"]
self.port = config["port"]
- self.loop = config.get("loop") or asyncio.get_event_loop()
self.broker = str(self.host) + ":" + str(self.port)
self.group_id = config.get("group_id")
def disconnect(self):
try:
pass
- # self.loop.close()
except Exception as e: # TODO refine
raise MsgException(str(e))
retry = 2 # Try two times
while retry:
try:
- self.loop.run_until_complete(
- self.aiowrite(topic=topic, key=key, msg=msg)
- )
+ asyncio.run(self.aiowrite(topic=topic, key=key, msg=msg))
break
except Exception as e:
retry -= 1
:return: topic, key, message; or None
"""
try:
- return self.loop.run_until_complete(self.aioread(topic, self.loop))
+ return asyncio.run(self.aioread(topic))
except MsgException:
raise
except Exception as e:
raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
- async def aiowrite(self, topic, key, msg, loop=None):
+ async def aiowrite(self, topic, key, msg):
"""
Asyncio write
:param topic: str kafka topic
:param key: str kafka key
:param msg: str or dictionary kafka message
- :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
:return: None
"""
-
- if not loop:
- loop = self.loop
try:
self.producer = AIOKafkaProducer(
- loop=loop,
key_serializer=str.encode,
value_serializer=str.encode,
bootstrap_servers=self.broker,
async def aioread(
self,
topic,
- loop=None,
callback=None,
aiocallback=None,
group_id=None,
"""
Asyncio read from one or several topics.
:param topic: can be str: single topic; or str list: several topics
- :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
:param callback: synchronous callback function that will handle the message in kafka bus
:param aiocallback: async callback function that will handle the message in kafka bus
:param group_id: kafka group_id to use. Can be False (set group_id to None), None (use general group_id provided
:param kwargs: optional keyword arguments for callback function
:return: If no callback defined, it returns (topic, key, message)
"""
-
- if not loop:
- loop = self.loop
if group_id is False:
group_id = None
elif group_id is None:
else:
topic_list = (topic,)
self.consumer = AIOKafkaConsumer(
- loop=loop,
bootstrap_servers=self.broker,
group_id=group_id,
auto_offset_reset="earliest" if from_beginning else "latest",
self.files_read = {}
self.files_write = {}
self.buffer = {}
- self.loop = None
def connect(self, config):
try:
self.path += "/"
if not os.path.exists(self.path):
os.mkdir(self.path)
- self.loop = config.get("loop")
except MsgException:
raise
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
async def aioread(
- self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs
+ self, topic, callback=None, aiocallback=None, group_id=None, **kwargs
):
"""
Asyncio read from one or several topics. It blocks
:param topic: can be str: single topic; or str list: several topics
- :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
:param callback: synchronous callback function that will handle the message
:param aiocallback: async callback function that will handle the message
:param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general
:param kwargs: optional keyword arguments for callback function
:return: If no callback defined, it returns (topic, key, message)
"""
- _loop = loop or self.loop
try:
while True:
msg = self.read(topic, blocks=False)
await aiocallback(*msg, **kwargs)
else:
return msg
- await asyncio.sleep(2, loop=_loop)
+ await asyncio.sleep(2)
except MsgException:
raise
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
- async def aiowrite(self, topic, key, msg, loop=None):
+ async def aiowrite(self, topic, key, msg):
"""
Asyncio write. It blocks
:param topic: str
:param key: str
:param msg: message, can be str or yaml
- :param loop: asyncio loop
:return: nothing if ok or raises an exception
"""
return self.write(topic, key, msg)
def setUp(self, mock_logger):
mock_logger = logging.getLogger()
mock_logger.disabled = True
- self.loop = asyncio.get_event_loop()
self.encryption = Encryption(uri="uri", config={})
self.encryption.encoding_type = encoding_type
self.encryption.encrypt_mode = encyrpt_mode
"ip": "192.168.12.23",
}
fields = ["secret", "cacert"]
- self.loop.run_until_complete(
+
+ asyncio.run(
self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
)
self.assertEqual(input_item, expected_item)
"""item is empty and fields exists."""
input_item = {}
fields = ["secret", "cacert"]
- self.loop.run_until_complete(
+ asyncio.run(
self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
)
self.assertEqual(input_item, {})
"""item exists and fields is empty."""
input_item = copy.deepcopy(item)
fields = []
- self.loop.run_until_complete(
+ asyncio.run(
self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
)
self.assertEqual(input_item, item)
"path": "/var",
"ip": "192.168.12.23",
}
- self.loop.run_until_complete(
+ asyncio.run(
self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
)
self.assertEqual(input_item, expected_item)
mock_decrypt.return_value = "mysecret"
input_item = copy.deepcopy(item)
fields = ["secret"]
- self.loop.run_until_complete(
+ asyncio.run(
self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
)
self.assertEqual(input_item, item)
fields = ["secret"]
input_item = copy.deepcopy(item)
with self.assertRaises(Exception) as error:
- self.loop.run_until_complete(
+ asyncio.run(
self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
)
self.assertEqual(
def test_encrypt(self, mock_encrypt_value, mock_get_secret_key):
"""Method decrypt raises error."""
mock_encrypt_value.return_value = encyrpted_value
- result = self.loop.run_until_complete(
- self.encryption.encrypt(value, schema_version, salt)
- )
+ result = asyncio.run(self.encryption.encrypt(value, schema_version, salt))
self.assertEqual(result, encyrpted_value)
mock_get_secret_key.assert_called_once()
mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
"""Method get_secret_key raises error."""
mock_get_secret_key.side_effect = DbException("Unexpected type.")
with self.assertRaises(Exception) as error:
- self.loop.run_until_complete(
- self.encryption.encrypt(value, schema_version, salt)
- )
+ asyncio.run(self.encryption.encrypt(value, schema_version, salt))
self.assertEqual(str(error.exception), "database exception Unexpected type.")
mock_get_secret_key.assert_called_once()
mock_encrypt_value.assert_not_called()
"A bytes-like object is required, not 'str'"
)
with self.assertRaises(Exception) as error:
- self.loop.run_until_complete(
- self.encryption.encrypt(value, schema_version, salt)
- )
+ asyncio.run(self.encryption.encrypt(value, schema_version, salt))
self.assertEqual(
str(error.exception), "A bytes-like object is required, not 'str'"
)
def test_decrypt(self, mock_decrypt_value, mock_get_secret_key):
"""Decrypted successfully."""
mock_decrypt_value.return_value = value
- result = self.loop.run_until_complete(
+ result = asyncio.run(
self.encryption.decrypt(encyrpted_value, schema_version, salt)
)
self.assertEqual(result, value)
"""Method get_secret_key raises error."""
mock_get_secret_key.side_effect = DbException("Unexpected type.")
with self.assertRaises(Exception) as error:
- self.loop.run_until_complete(
- self.encryption.decrypt(encyrpted_value, schema_version, salt)
- )
+ asyncio.run(self.encryption.decrypt(encyrpted_value, schema_version, salt))
self.assertEqual(str(error.exception), "database exception Unexpected type.")
mock_get_secret_key.assert_called_once()
mock_decrypt_value.assert_not_called()
"A bytes-like object is required, not 'str'"
)
with self.assertRaises(Exception) as error:
- self.loop.run_until_complete(
- self.encryption.decrypt(encyrpted_value, schema_version, salt)
- )
+ asyncio.run(self.encryption.decrypt(encyrpted_value, schema_version, salt))
self.assertEqual(
str(error.exception), "A bytes-like object is required, not 'str'"
)
def test_get_secret_key_exists(self, mock_join_keys):
"""secret_key exists."""
self.encryption._secret_key = secret_key
- self.loop.run_until_complete(self.encryption.get_secret_key())
+ asyncio.run(self.encryption.get_secret_key())
self.assertEqual(self.encryption.secret_key, secret_key)
mock_join_keys.assert_not_called()
self.encryption._admin_collection.find_one.return_value = None
self.encryption._config = {"database_commonkey": "osm_new_key"}
mock_join_keys.return_value = joined_key
- self.loop.run_until_complete(self.encryption.get_secret_key())
+ asyncio.run(self.encryption.get_secret_key())
self.assertEqual(self.encryption.secret_key, joined_key)
self.assertEqual(mock_join_keys.call_count, 1)
mock_b64decode.assert_not_called()
self.encryption._admin_collection.find_one.return_value = {"version": "1.0"}
self.encryption._config = {"database_commonkey": "osm_new_key"}
mock_join_keys.return_value = joined_key
- self.loop.run_until_complete(self.encryption.get_secret_key())
+ asyncio.run(self.encryption.get_secret_key())
self.assertEqual(self.encryption.secret_key, joined_key)
self.assertEqual(mock_join_keys.call_count, 1)
mock_b64decode.assert_not_called()
self.encryption._config = {"database_commonkey": "osm_new_key"}
mock_join_keys.side_effect = [secret_key, joined_key]
mock_b64decode.return_value = base64_decoded_serial
- self.loop.run_until_complete(self.encryption.get_secret_key())
+ asyncio.run(self.encryption.get_secret_key())
self.assertEqual(self.encryption.secret_key, joined_key)
self.assertEqual(mock_join_keys.call_count, 2)
mock_b64decode.assert_called_once_with(serial_bytes)
self.encryption._config = {"database_commonkey": "osm_new_key"}
mock_join_keys.side_effect = DbException("Invalid data type.")
with self.assertRaises(Exception) as error:
- self.loop.run_until_complete(self.encryption.get_secret_key())
+ asyncio.run(self.encryption.get_secret_key())
self.assertEqual(str(error.exception), "database exception Invalid data type.")
self.assertEqual(mock_join_keys.call_count, 1)
check_if_assert_not_called(
"A bytes-like object is required, not 'str'"
)
with self.assertRaises(Exception) as error:
- self.loop.run_until_complete(self.encryption.get_secret_key())
+ asyncio.run(self.encryption.get_secret_key())
self.assertEqual(
str(error.exception), "A bytes-like object is required, not 'str'"
)
self.encryption._config = {"database_commonkey": "osm_new_key"}
mock_join_keys.return_value = secret_key
with self.assertRaises(Exception) as error:
- self.loop.run_until_complete(self.encryption.get_secret_key())
+ asyncio.run(self.encryption.get_secret_key())
self.assertEqual(str(error.exception), "database exception Connection failed.")
self.assertEqual(self.encryption.secret_key, None)
self.assertEqual(mock_join_keys.call_count, 1)
def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self):
"""Encrypt and decrypt with schema version 1.1, salt exists."""
- encrypted_msg = self.loop.run_until_complete(
+ encrypted_msg = asyncio.run(
self.encryption.encrypt(value, schema_version, salt)
)
- decrypted_msg = self.loop.run_until_complete(
+ decrypted_msg = asyncio.run(
self.encryption.decrypt(encrypted_msg, schema_version, salt)
)
self.assertEqual(value, decrypted_msg)
def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self):
"""Encrypt and decrypt with schema version 1.0, salt exists."""
schema_version = "1.0"
- encrypted_msg = self.loop.run_until_complete(
+ encrypted_msg = asyncio.run(
self.encryption.encrypt(value, schema_version, salt)
)
- decrypted_msg = self.loop.run_until_complete(
+ decrypted_msg = asyncio.run(
self.encryption.decrypt(encrypted_msg, schema_version, salt)
)
self.assertEqual(value, decrypted_msg)
"""Encrypt and decrypt with schema version 1.1, without salt."""
salt = None
with self.assertRaises(Exception) as error:
- self.loop.run_until_complete(
- self.encryption.encrypt(value, schema_version, salt)
- )
+ asyncio.run(self.encryption.encrypt(value, schema_version, salt))
self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
# For those usages not covered by the Apache License, Version 2.0 please
# contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
##
-
+import asyncio
import http
from osm_common.msgbase import MsgBase, MsgException
assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
-def test_aiowrite(msg_base, event_loop):
+def test_aiowrite(msg_base):
with pytest.raises(MsgException) as excinfo:
- event_loop.run_until_complete(
- msg_base.aiowrite("test", "test", "test", event_loop)
- )
+ asyncio.run(msg_base.aiowrite("test", "test", "test"))
assert str(excinfo.value).startswith(
exception_message("Method 'aiowrite' not implemented")
)
assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
-def test_aioread(msg_base, event_loop):
+def test_aioread(msg_base):
with pytest.raises(MsgException) as excinfo:
- event_loop.run_until_complete(msg_base.aioread("test", event_loop))
+ asyncio.run(msg_base.aioread("test"))
assert str(excinfo.value).startswith(
exception_message("Method 'aioread' not implemented")
)
# For those usages not covered by the Apache License, Version 2.0 please
# contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
##
-
+import asyncio
import http
import logging
import os
(["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
],
)
-def test_aioread(msg_local_with_data, event_loop, topics, datas):
+def test_aioread(msg_local_with_data, topics, datas):
def write_to_topic(topics, datas):
time.sleep(2)
for topic in topics:
t.start()
for topic in topics:
for data in datas:
- recv = event_loop.run_until_complete(
- msg_local_with_data.aioread(topic, event_loop)
- )
+ recv = asyncio.run(msg_local_with_data.aioread(topic))
recv_topic, recv_key, recv_msg = recv
key = list(data.keys())[0]
val = data[key]
t.join()
-def test_aioread_exception(msg_local_with_data, event_loop):
+def test_aioread_exception(msg_local_with_data):
msg_local_with_data.files_read = MagicMock()
msg_local_with_data.files_read.__contains__.side_effect = Exception()
with pytest.raises(MsgException) as excinfo:
- event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
+ asyncio.run(msg_local_with_data.aioread("topic1"))
assert str(excinfo.value).startswith(empty_exception_message())
assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
-def test_aioread_general_exception(msg_local_with_data, event_loop):
+def test_aioread_general_exception(msg_local_with_data):
msg_local_with_data.read = MagicMock()
msg_local_with_data.read.side_effect = Exception()
with pytest.raises(MsgException) as excinfo:
- event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
+ asyncio.run(msg_local_with_data.aioread("topic1"))
assert str(excinfo.value).startswith(empty_exception_message())
assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
("test_topic", "test_none", None),
],
)
-def test_aiowrite(msg_local_config, event_loop, topic, key, msg):
+def test_aiowrite(msg_local_config, topic, key, msg):
file_path = msg_local_config.path + topic
- event_loop.run_until_complete(msg_local_config.aiowrite(topic, key, msg))
+ asyncio.run(msg_local_config.aiowrite(topic, key, msg))
assert os.path.exists(file_path)
with open(file_path, "r") as stream:
("test_topic", "test_none", None, 3),
],
)
-def test_aiowrite_with_multiple_calls(
- msg_local_config, event_loop, topic, key, msg, times
-):
+def test_aiowrite_with_multiple_calls(msg_local_config, topic, key, msg, times):
file_path = msg_local_config.path + topic
for _ in range(times):
- event_loop.run_until_complete(msg_local_config.aiowrite(topic, key, msg))
+ asyncio.run(msg_local_config.aiowrite(topic, key, msg))
assert os.path.exists(file_path)
with open(file_path, "r") as stream:
}
-def test_aiowrite_exception(msg_local_config, event_loop):
+def test_aiowrite_exception(msg_local_config):
msg_local_config.files_write = MagicMock()
msg_local_config.files_write.__contains__.side_effect = Exception()
with pytest.raises(MsgException) as excinfo:
- event_loop.run_until_complete(msg_local_config.aiowrite("test", "test", "test"))
+ asyncio.run(msg_local_config.aiowrite("test", "test", "test"))
assert str(excinfo.value).startswith(empty_exception_message())
assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+---
+upgrade:
+ - |
+ Upgrade Ubuntu from 20.04 to 22.04 and Python from 3.8 to 3.10.
+
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
-attrs==22.2.0
- # via pytest
-coverage==7.1.0
+coverage==7.2.5
# via -r requirements-test.in
-exceptiongroup==1.1.0
+exceptiongroup==1.1.1
# via pytest
iniconfig==2.0.0
# via pytest
-nose2==0.12.0
+nose2==0.13.0
# via -r requirements-test.in
-packaging==23.0
+packaging==23.1
# via pytest
pluggy==1.0.0
# via pytest
-pytest==7.2.1
+pytest==7.3.1
# via -r requirements-test.in
tomli==2.0.1
# via pytest
# See the License for the specific language governing permissions and
# limitations under the License.
-pymongo<4
+pymongo
aiokafka
pyyaml==5.4.1
pycryptodome
dataclasses
-motor==1.3.1
\ No newline at end of file
+motor
\ No newline at end of file
# via aiokafka
dataclasses==0.6
# via -r requirements.in
+dnspython==2.3.0
+ # via pymongo
kafka-python==2.0.2
# via aiokafka
-motor==1.3.1
+motor==3.1.2
# via -r requirements.in
-packaging==23.0
+packaging==23.1
# via aiokafka
pycryptodome==3.17
# via -r requirements.in
-pymongo==3.13.0
+pymongo==4.3.3
# via
# -r requirements.in
# motor
[testenv]
usedevelop = True
-basepython = python3.8
+basepython = python3.10
setenv = VIRTUAL_ENV={envdir}
PYTHONDONTWRITEBYTECODE = 1
deps = -r{toxinidir}/requirements.txt
coverage report --omit='*tests*'
coverage html -d ./cover --omit='*tests*'
coverage xml -o coverage.xml --omit=*tests*
-whitelist_externals = sh
+allowlist_externals = sh
#######################################################################################
[testenv:flake8]
[testenv:pip-compile]
deps = pip-tools==6.6.2
skip_install = true
-whitelist_externals = bash
+allowlist_externals = bash
[
commands =
- bash -c "for file in requirements*.in ; do \
python3 setup.py --command-packages=stdeb.command sdist_dsc
sh -c 'cd deb_dist/osm-common*/ && dpkg-buildpackage -rfakeroot -uc -us'
sh -c 'rm osm_common/requirements.txt'
-whitelist_externals = sh
+allowlist_externals = sh
#######################################################################################
[testenv:release_notes]
deps = reno
skip_install = true
-whitelist_externals = bash
+allowlist_externals = bash
commands =
reno new {posargs:new_feature}
bash -c "sed -i -e '1 e head -16 tox.ini' releasenotes/notes/{posargs:new_feature}*.yaml"