Prepare installer and pods for Rel TWELVE
[osm/devops.git] / installers / charm / kafka-k8s / reactive / kafka.py
1 # Copyright 2021 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: legal@canonical.com
17 #
18 # To get in touch with the maintainers, please contact:
19 # osm-charmers@lists.launchpad.net
20 ##
21
22 from charms.layer.caas_base import pod_spec_set
23 from charms.reactive import when, when_not, hook
24 from charms.reactive import endpoint_from_flag
25 from charms.reactive.flags import set_flag, clear_flag
26 from charmhelpers.core.hookenv import log, metadata, config
27 from charms import layer
28 from charms.osm.k8s import get_service_ip
29
30
31 @hook("upgrade-charm")
32 @when("leadership.is_leader")
33 def upgrade():
34 clear_flag("kafka-k8s.configured")
35
36
37 @when("config.changed")
38 @when("leadership.is_leader")
39 def restart():
40 clear_flag("kafka-k8s.configured")
41
42
43 @when_not("zookeeper.ready")
44 @when("leadership.is_leader")
45 def waiting_for_zookeeper():
46 layer.status.waiting("Waiting for Zookeeper to be ready")
47
48
49 @when("zookeeper.ready")
50 @when_not("kafka-k8s.configured")
51 @when("leadership.is_leader")
52 def configure():
53 layer.status.maintenance("Configuring kafka container")
54 try:
55 zookeeper = endpoint_from_flag("zookeeper.ready")
56 zk_units = zookeeper.zookeepers()
57 zk_unit = zk_units[0]
58 if zk_unit["port"]:
59 cfg = config()
60 zookeeper_uri = ""
61 zk_pod_base_name = "zookeeper-k8s"
62 zk_service_name = cfg.get("zookeeper-service-name")
63 zk_num = cfg.get("zookeeper-units")
64 if zk_num == 1:
65 zookeeper_uri = "{}:{}".format(zk_unit["host"], zk_unit["port"])
66 else:
67 for i in range(0, zk_num):
68 if i:
69 zookeeper_uri += ","
70 zookeeper_uri += "{}-{}.{}:{}".format(
71 zk_pod_base_name, i, zk_service_name, zk_unit["port"]
72 )
73 spec = make_pod_spec(zookeeper_uri)
74 log("set pod spec:\n{}".format(spec))
75 pod_spec_set(spec)
76 set_flag("kafka-k8s.configured")
77 except Exception as e:
78 layer.status.blocked("k8s spec failed to deploy: {}".format(e))
79
80
81 @when("kafka-k8s.configured")
82 def set_kafka_active():
83 layer.status.active("ready")
84
85
86 @when("zookeeper.ready")
87 @when_not("leadership.is_leader")
88 def non_leaders_active():
89 layer.status.active("ready")
90
91
92 @when("kafka-k8s.configured")
93 @when("kafka.joined", "zookeeper.ready")
94 def serve_client():
95 layer.status.maintenance("Sending kafka configuration")
96 try:
97 kafka = endpoint_from_flag("kafka.joined")
98 zookeeper = endpoint_from_flag("zookeeper.ready")
99 if zookeeper and kafka:
100 service_ip = get_service_ip("kafka")
101 if service_ip:
102 kafka.send_connection(
103 get_kafka_port(), service_ip,
104 )
105 kafka.send_zookeepers(zookeeper.zookeepers())
106 clear_flag("kafka.joined")
107 except Exception as e:
108 log("Fail sending kafka configuration: {}".format(e))
109
110
111 def make_pod_spec(zookeeper_uri):
112 """Make pod specification for Kubernetes
113
114 Args:
115 zookeeper_uri (str): Zookeeper hosts appended by comma.
116 Returns:
117 pod_spec: Pod specification for Kubernetes
118 """
119
120 with open("reactive/spec_template.yaml") as spec_file:
121 pod_spec_template = spec_file.read()
122
123 md = metadata()
124 cfg = config()
125
126 data = {
127 "name": md.get("name"),
128 "docker_image": cfg.get("image"),
129 "port": get_kafka_port(),
130 "zookeeper_uri": zookeeper_uri,
131 }
132 data.update(cfg)
133 return pod_spec_template % data
134
135
136 def get_kafka_port():
137 """Returns Kafka port"""
138 cfg = config()
139 return cfg.get("advertised-port")