1 # Copyright 2022 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from unittest
import TestCase
19 from unittest
.mock
import Mock
21 from osm_lcm
.lcm
import Lcm
22 from osm_lcm
.data_utils
.database
.database
import Database
23 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
25 from osm_lcm
.lcm_utils
import LcmException
28 def create_lcm_config(
29 source_path
: str, destination_path
: str, line_number
=None
31 """This function creates new lcm_config files by
32 using the config file template. If line number is provided,
33 it removes the line from file.
35 source_path: (str) source file path
36 destination_path: (str) destination file path
37 line_number: (int) line to be deleted
39 with
open(source_path
, "r+") as fs
:
40 # read and store all lines into list
41 contents
= fs
.readlines()
43 with
open(destination_path
, "w") as fd
:
46 raise LcmException("Line number can not be smaller than zero")
47 contents
.pop(line_number
)
48 contents
= "".join(contents
)
52 def check_file_content(health_check_file
: str) -> str:
53 """Get the health check file contents
55 health_check_file: (str) file path
58 contents: (str) health check file content
60 with
open(health_check_file
, "r") as hc
:
65 class TestLcm(TestCase
):
67 self
.config_file
= os
.getcwd() + "/osm_lcm/tests/test_lcm_config_file.yaml"
68 self
.config_file_without_storage_path
= tempfile
.mkstemp()[1]
69 Database
.instance
= None
70 self
.db
= Mock(Database({"database": {"driver": "memory"}}).instance
.db
)
71 Database().instance
.db
= self
.db
72 Filesystem
.instance
= None
74 Filesystem({"storage": {"driver": "local", "path": "/"}}).instance
.fs
76 Filesystem
.instance
.fs
= self
.fs
78 self
.my_lcm
= Lcm(config_file
=self
.config_file
)
80 def test_get_health_check_file_from_config_file(self
):
81 self
.assertEqual(self
.my_lcm
.health_check_file
, "/tmp/storage/time_last_ping")
83 def test_health_check_file_not_in_config_file(self
):
84 create_lcm_config(self
.config_file
, self
.config_file_without_storage_path
, 38)
85 with self
.assertRaises(LcmException
):
86 Lcm(config_file
=self
.config_file_without_storage_path
)
88 def test_kafka_admin_topic_ping_command(self
):
92 "worker_id": self
.my_lcm
.worker_id
,
94 self
.my_lcm
.health_check_file
= tempfile
.mkstemp()[1]
95 self
.my_lcm
.kafka_read_callback("admin", "ping", params
)
96 pattern
= "[0-9]{10}.[0-9]{5,8}"
97 # Epoch time is written in health check file.
98 result
= re
.findall(pattern
, check_file_content(self
.my_lcm
.health_check_file
))
99 self
.assertTrue(result
)
101 def test_kafka_wrong_topic_ping_command(self
):
105 "worker_id": self
.my_lcm
.worker_id
,
107 self
.my_lcm
.health_check_file
= tempfile
.mkstemp()[1]
108 self
.my_lcm
.kafka_read_callback("kafka", "ping", params
)
109 pattern
= "[0-9]{10}.[0-9]{5,8}"
110 # Health check file is empty.
111 result
= re
.findall(pattern
, check_file_content(self
.my_lcm
.health_check_file
))
112 self
.assertFalse(result
)
114 def test_kafka_admin_topic_ping_command_wrong_worker_id(self
):
120 self
.my_lcm
.health_check_file
= tempfile
.mkstemp()[1]
121 self
.my_lcm
.kafka_read_callback("admin", "ping", params
)
122 pattern
= "[0-9]{10}.[0-9]{5,8}"
123 # Health check file is empty.
124 result
= re
.findall(pattern
, check_file_content(self
.my_lcm
.health_check_file
))
125 self
.assertFalse(result
)