1 # Copyright 2022 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from unittest
import TestCase
19 from unittest
.mock
import Mock
21 from osm_lcm
.lcm
import Lcm
22 from osm_lcm
.data_utils
.database
.database
import Database
23 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
25 from osm_lcm
.lcm_utils
import LcmException
28 def create_lcm_config(
29 source_path
: str, destination_path
: str, line_number
=None
31 """This function creates new lcm_config files by
32 using the config file template. If line number is provided,
33 it removes the line from file.
35 source_path: (str) source file path
36 destination_path: (str) destination file path
37 line_number: (int) line to be deleted
39 with
open(source_path
, "r+") as fs
:
40 # read and store all lines into list
41 contents
= fs
.readlines()
43 with
open(destination_path
, "w") as fd
:
46 raise LcmException("Line number can not be smaller than zero")
47 contents
.pop(line_number
)
48 contents
= "".join(contents
)
52 def check_file_content(health_check_file
: str) -> str:
53 """Get the health check file contents
55 health_check_file: (str) file path
58 contents: (str) health check file content
60 with
open(health_check_file
, "r") as hc
:
65 class TestLcm(TestCase
):
68 self
.config_file
= os
.getcwd() + "/osm_lcm/tests/test_lcm_config_file.yaml"
69 self
.config_file_without_storage_path
= tempfile
.mkstemp()[1]
70 Database
.instance
= None
71 self
.db
= Mock(Database({"database": {"driver": "memory"}}).instance
.db
)
72 Database().instance
.db
= self
.db
73 Filesystem
.instance
= None
75 Filesystem({"storage": {"driver": "local", "path": "/"}}).instance
.fs
77 Filesystem
.instance
.fs
= self
.fs
79 self
.my_lcm
= Lcm(config_file
=self
.config_file
)
81 def test_get_health_check_file_from_config_file(self
):
82 self
.assertEqual(self
.my_lcm
.health_check_file
, "/tmp/storage/time_last_ping")
84 def test_health_check_file_not_in_config_file(self
):
85 create_lcm_config(self
.config_file
, self
.config_file_without_storage_path
, 38)
86 with self
.assertRaises(LcmException
):
87 Lcm(config_file
=self
.config_file_without_storage_path
)
89 def test_kafka_admin_topic_ping_command(self
):
93 "worker_id": self
.my_lcm
.worker_id
,
95 self
.my_lcm
.health_check_file
= tempfile
.mkstemp()[1]
96 self
.my_lcm
.kafka_read_callback("admin", "ping", params
)
97 pattern
= "[0-9]{10}.[0-9]{5,8}"
98 # Epoch time is written in health check file.
99 result
= re
.findall(pattern
, check_file_content(self
.my_lcm
.health_check_file
))
100 self
.assertTrue(result
)
102 def test_kafka_wrong_topic_ping_command(self
):
106 "worker_id": self
.my_lcm
.worker_id
,
108 self
.my_lcm
.health_check_file
= tempfile
.mkstemp()[1]
109 self
.my_lcm
.kafka_read_callback("kafka", "ping", params
)
110 pattern
= "[0-9]{10}.[0-9]{5,8}"
111 # Health check file is empty.
112 result
= re
.findall(pattern
, check_file_content(self
.my_lcm
.health_check_file
))
113 self
.assertFalse(result
)
115 def test_kafka_admin_topic_ping_command_wrong_worker_id(self
):
121 self
.my_lcm
.health_check_file
= tempfile
.mkstemp()[1]
122 self
.my_lcm
.kafka_read_callback("admin", "ping", params
)
123 pattern
= "[0-9]{10}.[0-9]{5,8}"
124 # Health check file is empty.
125 result
= re
.findall(pattern
, check_file_content(self
.my_lcm
.health_check_file
))
126 self
.assertFalse(result
)