1 # Copyright 2021 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: legal@canonical.com
18 # To get in touch with the maintainers, please contact:
19 # osm-charmers@lists.launchpad.net
22 from charms
.layer
.caas_base
import pod_spec_set
23 from charms
.reactive
import when
, when_not
, hook
24 from charms
.reactive
import endpoint_from_flag
25 from charms
.reactive
.flags
import set_flag
, clear_flag
26 from charmhelpers
.core
.hookenv
import log
, metadata
, config
27 from charms
import layer
28 from charms
.osm
.k8s
import get_service_ip
31 @hook("upgrade-charm")
32 @when("leadership.is_leader")
34 clear_flag("kafka-k8s.configured")
37 @when("config.changed")
38 @when("leadership.is_leader")
40 clear_flag("kafka-k8s.configured")
43 @when_not("zookeeper.ready")
44 @when("leadership.is_leader")
45 def waiting_for_zookeeper():
46 layer
.status
.waiting("Waiting for Zookeeper to be ready")
49 @when("zookeeper.ready")
50 @when_not("kafka-k8s.configured")
51 @when("leadership.is_leader")
53 layer
.status
.maintenance("Configuring kafka container")
55 zookeeper
= endpoint_from_flag("zookeeper.ready")
56 zk_units
= zookeeper
.zookeepers()
61 zk_pod_base_name
= "zookeeper-k8s"
62 zk_service_name
= cfg
.get("zookeeper-service-name")
63 zk_num
= cfg
.get("zookeeper-units")
65 zookeeper_uri
= "{}:{}".format(zk_unit
["host"], zk_unit
["port"])
67 for i
in range(0, zk_num
):
70 zookeeper_uri
+= "{}-{}.{}:{}".format(
71 zk_pod_base_name
, i
, zk_service_name
, zk_unit
["port"]
73 spec
= make_pod_spec(zookeeper_uri
)
74 log("set pod spec:\n{}".format(spec
))
76 set_flag("kafka-k8s.configured")
77 except Exception as e
:
78 layer
.status
.blocked("k8s spec failed to deploy: {}".format(e
))
81 @when("kafka-k8s.configured")
82 def set_kafka_active():
83 layer
.status
.active("ready")
86 @when("zookeeper.ready")
87 @when_not("leadership.is_leader")
88 def non_leaders_active():
89 layer
.status
.active("ready")
92 @when("kafka-k8s.configured")
93 @when("kafka.joined", "zookeeper.ready")
95 layer
.status
.maintenance("Sending kafka configuration")
97 kafka
= endpoint_from_flag("kafka.joined")
98 zookeeper
= endpoint_from_flag("zookeeper.ready")
99 if zookeeper
and kafka
:
100 service_ip
= get_service_ip("kafka")
102 kafka
.send_connection(
103 get_kafka_port(), service_ip
,
105 kafka
.send_zookeepers(zookeeper
.zookeepers())
106 clear_flag("kafka.joined")
107 except Exception as e
:
108 log("Fail sending kafka configuration: {}".format(e
))
111 def make_pod_spec(zookeeper_uri
):
112 """Make pod specification for Kubernetes
115 zookeeper_uri (str): Zookeeper hosts appended by comma.
117 pod_spec: Pod specification for Kubernetes
120 with
open("reactive/spec_template.yaml") as spec_file
:
121 pod_spec_template
= spec_file
.read()
127 "name": md
.get("name"),
128 "docker_image": cfg
.get("image"),
129 "port": get_kafka_port(),
130 "zookeeper_uri": zookeeper_uri
,
133 return pod_spec_template
% data
136 def get_kafka_port():
137 """Returns Kafka port"""
139 return cfg
.get("advertised-port")