#!/usr/bin/env python3 # Copyright 2022 David Garcia # See LICENSE file for licensing details. # # Learn more at: https://juju.is/docs/sdk import json import logging from typing import Tuple from ops.charm import CharmBase from ops.main import main from ops.model import ActiveStatus, BlockedStatus from ldap3 import Server, Connection, ALL, SUBTREE, ALL_ATTRIBUTES logger = logging.getLogger(__name__) USER = "cn=admin,dc=example,dc=org" PASSWORD = "admin" class Service: def __init__(self, service_info): self._service_info = service_info @property def ip(self): return self._service_info["ip"][0] def get_port(self, port_name): return self._service_info["ports"][port_name]["port"] class OsmConfig: def __init__(self, charm: CharmBase) -> None: self._charm = charm def get_service(self, service_name: str) -> Service: osm_config = json.loads(self._charm.config["osm-config"]) services = [ s_values for s_name, s_values in osm_config["v0"]["k8s"]["services"].items() if service_name in s_name ] return Service(services[0]) class Ldap: def __init__(self, ldap_uri: str) -> None: self.ldap_uri = ldap_uri self.connection = None def _connect(self): server = Server(self.ldap_uri, get_info=ALL, connect_timeout=10) self.connection = Connection( server, user="cn=admin,dc=example,dc=org", password="admin" ) if not self.connection.bind(): raise Exception("cannot connect to ldap server") def _disconnect(self): if not self.connection or self.connection.closed: return self.connection.unbind() def get_user_info(self, cn, dc): self._connect() try: if self.connection.search( search_base=f"{cn},{dc}", search_filter="(objectclass=*)", search_scope=SUBTREE, attributes=ALL_ATTRIBUTES, ): return self.connection.entries finally: self._disconnect() class OpenldapOperatorCharm(CharmBase): """Charm the service.""" def __init__(self, *args): super().__init__(*args) self.osm_config = OsmConfig(self) self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe( self.on.get_user_info_action, self._on_get_user_info_action ) def _on_config_changed(self, _): """Handler for config-changed event.""" osm_config = self.config.get("osm-config") if not osm_config: self.unit.status = BlockedStatus("osm-config missing") return logger.info(f"osm-config={osm_config}") self.unit.status = ActiveStatus() def _on_get_user_info_action(self, event): """Handler for get-user-info action.""" cn_param = event.params["cn"] dc_param = event.params["dc"] cn = f"cn={cn_param}" dc = ",".join([f"dc={dc_component}" for dc_component in dc_param.split(".")]) result = "" try: openldap_service = self.osm_config.get_service("stable-openldap") ldap_uri = ( f'ldap://{openldap_service.ip}:{openldap_service.get_port("ldap-port")}' ) ldap = Ldap(ldap_uri) result = ldap.get_user_info(cn, dc) except Exception as e: result = e logger.error(e) event.fail(f"Failed getting user info: {e}") event.set_results({"output": result}) if __name__ == "__main__": main(OpenldapOperatorCharm)