Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python3
# Copyright 2022 David Garcia
# See LICENSE file for licensing details.
#
# Learn more at: https://juju.is/docs/sdk
import json
import logging
from typing import Tuple
from ops.charm import CharmBase
from ops.main import main
from ops.model import ActiveStatus, BlockedStatus
from ldap3 import Server, Connection, ALL, SUBTREE, ALL_ATTRIBUTES
logger = logging.getLogger(__name__)
USER = "cn=admin,dc=example,dc=org"
PASSWORD = "admin"
class Service:
def __init__(self, service_info):
self._service_info = service_info
@property
def ip(self):
return self._service_info["ip"][0]
def get_port(self, port_name):
return self._service_info["ports"][port_name]["port"]
class OsmConfig:
def __init__(self, charm: CharmBase) -> None:
self._charm = charm
def get_service(self, service_name: str) -> Service:
osm_config = json.loads(self._charm.config["osm-config"])
services = [
s_values
for s_name, s_values in osm_config["v0"]["k8s"]["services"].items()
if service_name in s_name
]
return Service(services[0])
class Ldap:
def __init__(self, ldap_uri: str) -> None:
self.ldap_uri = ldap_uri
self.connection = None
def _connect(self):
server = Server(self.ldap_uri, get_info=ALL)
self.connection = Connection(
server, user="cn=admin,dc=example,dc=org", password="admin"
)
if not self.connection.bind():
raise Exception("cannot connect to ldap server")
def _disconnect(self):
if not self.connection or self.connection.closed:
return
self.connection.unbind()
def get_user_info(self, cn, dc):
self._connect()
try:
if self.connection.search(
search_base=f"{cn},{dc}",
search_filter="(objectclass=*)",
search_scope=SUBTREE,
attributes=ALL_ATTRIBUTES,
):
return self.connection.entries
finally:
self._disconnect()
class OpenldapOperatorCharm(CharmBase):
"""Charm the service."""
def __init__(self, *args):
super().__init__(*args)
self.osm_config = OsmConfig(self)
self.framework.observe(self.on.config_changed, self._on_config_changed)
self.framework.observe(
self.on.get_user_info_action, self._on_get_user_info_action
)
def _on_config_changed(self, _):
"""Handler for config-changed event."""
osm_config = self.config.get("osm-config")
if not osm_config:
self.unit.status = BlockedStatus("osm-config missing")
return
logger.info(f"osm-config={osm_config}")
self.unit.status = ActiveStatus()
def _on_get_user_info_action(self, event):
"""Handler for get-user-info action."""
cn_param = event.params["cn"]
dc_param = event.params["dc"]
cn = f"cn={cn_param}"
dc = ",".join([f"dc={dc_component}" for dc_component in dc_param.split(".")])
try:
openldap_service = self.osm_config.get_service("stable-openldap")
ldap_uri = (
f'ldap://{openldap_service.ip}:{openldap_service.get_port("ldap-port")}'
)
ldap = Ldap(ldap_uri)
user_info = ldap.get_user_info(cn, dc)
event.set_results({"output": user_info})
except Exception as e:
event.fail(f"Failed getting user info: {e}")
if __name__ == "__main__":
main(OpenldapOperatorCharm)