1 # Copyright 2023 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Library to manage the relation for the data-platform products.
17 This library contains the Requires and Provides classes for handling the relation
18 between an application and multiple managed application supported by the data-team:
19 MySQL, Postgresql, MongoDB, Redis, and Kakfa.
21 ### Database (MySQL, Postgresql, MongoDB, and Redis)
24 This library is a uniform interface to a selection of common database
25 metadata, with added custom events that add convenience to database management,
26 and methods to consume the application related data.
29 Following an example of using the DatabaseCreatedEvent, in the context of the
30 application charm code:
34 from charms.data_platform_libs.v0.data_interfaces import (
39 class ApplicationCharm(CharmBase):
40 # Application charm that connects to database charms.
42 def __init__(self, *args):
43 super().__init__(*args)
45 # Charm events defined in the database requires charm library.
46 self.database = DatabaseRequires(self, relation_name="database", database_name="database")
47 self.framework.observe(self.database.on.database_created, self._on_database_created)
49 def _on_database_created(self, event: DatabaseCreatedEvent) -> None:
50 # Handle the created database
52 # Create configuration file for app
53 config_file = self._render_app_config_file(
59 # Start application with rendered configuration
60 self._start_application(config_file)
63 self.unit.status = ActiveStatus("received database credentials")
66 As shown above, the library provides some custom events to handle specific situations,
67 which are listed below:
69 - database_created: event emitted when the requested database is created.
70 - endpoints_changed: event emitted when the read/write endpoints of the database have changed.
71 - read_only_endpoints_changed: event emitted when the read-only endpoints of the database
72 have changed. Event is not triggered if read/write endpoints changed too.
74 If it is needed to connect multiple database clusters to the same relation endpoint
75 the application charm can implement the same code as if it would connect to only
76 one database cluster (like the above code example).
78 To differentiate multiple clusters connected to the same relation endpoint
79 the application charm can use the name of the remote application:
83 def _on_database_created(self, event: DatabaseCreatedEvent) -> None:
84 # Get the remote app name of the cluster that triggered this event
85 cluster = event.relation.app.name
88 It is also possible to provide an alias for each different database cluster/relation.
90 So, it is possible to differentiate the clusters in two ways.
91 The first is to use the remote application name, i.e., `event.relation.app.name`, as above.
93 The second way is to use different event handlers to handle each cluster events.
94 The implementation would be something like the following code:
98 from charms.data_platform_libs.v0.data_interfaces import (
103 class ApplicationCharm(CharmBase):
104 # Application charm that connects to database charms.
106 def __init__(self, *args):
107 super().__init__(*args)
109 # Define the cluster aliases and one handler for each cluster database created event.
110 self.database = DatabaseRequires(
112 relation_name="database",
113 database_name="database",
114 relations_aliases = ["cluster1", "cluster2"],
116 self.framework.observe(
117 self.database.on.cluster1_database_created, self._on_cluster1_database_created
119 self.framework.observe(
120 self.database.on.cluster2_database_created, self._on_cluster2_database_created
123 def _on_cluster1_database_created(self, event: DatabaseCreatedEvent) -> None:
124 # Handle the created database on the cluster named cluster1
126 # Create configuration file for app
127 config_file = self._render_app_config_file(
134 def _on_cluster2_database_created(self, event: DatabaseCreatedEvent) -> None:
135 # Handle the created database on the cluster named cluster2
137 # Create configuration file for app
138 config_file = self._render_app_config_file(
149 Following an example of using the DatabaseRequestedEvent, in the context of the
153 from charms.data_platform_libs.v0.data_interfaces import DatabaseProvides
155 class SampleCharm(CharmBase):
157 def __init__(self, *args):
158 super().__init__(*args)
159 # Charm events defined in the database provides charm library.
160 self.provided_database = DatabaseProvides(self, relation_name="database")
161 self.framework.observe(self.provided_database.on.database_requested,
162 self._on_database_requested)
163 # Database generic helper
164 self.database = DatabaseHelper()
166 def _on_database_requested(self, event: DatabaseRequestedEvent) -> None:
167 # Handle the event triggered by a new database requested in the relation
168 # Retrieve the database name using the charm library.
169 db_name = event.database
170 # generate a new user credential
171 username = self.database.generate_user()
172 password = self.database.generate_password()
173 # set the credentials for the relation
174 self.provided_database.set_credentials(event.relation.id, username, password)
175 # set other variables for the relation event.set_tls("False")
177 As shown above, the library provides a custom event (database_requested) to handle
178 the situation when an application charm requests a new database to be created.
179 It's preferred to subscribe to this event instead of relation changed event to avoid
180 creating a new database when other information other than a database name is
181 exchanged in the relation databag.
185 This library is the interface to use and interact with the Kafka charm. This library contains
186 custom events that add convenience to manage Kafka, and provides methods to consume the
187 application related data.
193 from charms.data_platform_libs.v0.data_interfaces import (
194 BootstrapServerChangedEvent,
199 class ApplicationCharm(CharmBase):
201 def __init__(self, *args):
202 super().__init__(*args)
203 self.kafka = KafkaRequires(self, "kafka_client", "test-topic")
204 self.framework.observe(
205 self.kafka.on.bootstrap_server_changed, self._on_kafka_bootstrap_server_changed
207 self.framework.observe(
208 self.kafka.on.topic_created, self._on_kafka_topic_created
211 def _on_kafka_bootstrap_server_changed(self, event: BootstrapServerChangedEvent):
212 # Event triggered when a bootstrap server was changed for this application
214 new_bootstrap_server = event.bootstrap_server
217 def _on_kafka_topic_created(self, event: TopicCreatedEvent):
218 # Event triggered when a topic was created for this application
219 username = event.username
220 password = event.password
223 bootstrap_server event.bootstrap_server
224 consumer_group_prefic = event.consumer_group_prefix
225 zookeeper_uris = event.zookeeper_uris
230 As shown above, the library provides some custom events to handle specific situations,
231 which are listed below:
233 - topic_created: event emitted when the requested topic is created.
234 - bootstrap_server_changed: event emitted when the bootstrap server have changed.
235 - credential_changed: event emitted when the credentials of Kafka changed.
239 Following the previous example, this is an example of the provider charm.
242 class SampleCharm(CharmBase):
244 from charms.data_platform_libs.v0.data_interfaces import (
249 def __init__(self, *args):
250 super().__init__(*args)
252 # Default charm events.
253 self.framework.observe(self.on.start, self._on_start)
255 # Charm events defined in the Kafka Provides charm library.
256 self.kafka_provider = KafkaProvides(self, relation_name="kafka_client")
257 self.framework.observe(self.kafka_provider.on.topic_requested, self._on_topic_requested)
258 # Kafka generic helper
259 self.kafka = KafkaHelper()
261 def _on_topic_requested(self, event: TopicRequestedEvent):
262 # Handle the on_topic_requested event.
265 relation_id = event.relation.id
266 # set connection info in the databag relation
267 self.kafka_provider.set_bootstrap_server(relation_id, self.kafka.get_bootstrap_server())
268 self.kafka_provider.set_credentials(relation_id, username=username, password=password)
269 self.kafka_provider.set_consumer_group_prefix(relation_id, ...)
270 self.kafka_provider.set_tls(relation_id, "False")
271 self.kafka_provider.set_zookeeper_uris(relation_id, ...)
274 As shown above, the library provides a custom event (topic_requested) to handle
275 the situation when an application charm requests a new topic to be created.
276 It is preferred to subscribe to this event instead of relation changed event to avoid
277 creating a new topic when other information other than a topic name is
278 exchanged in the relation databag.
283 from abc
import ABC
, abstractmethod
284 from collections
import namedtuple
285 from datetime
import datetime
286 from typing
import List
, Optional
288 from ops
.charm
import (
291 RelationChangedEvent
,
295 from ops
.framework
import EventSource
, Object
296 from ops
.model
import Relation
298 # The unique Charmhub library identifier, never change it
299 LIBID
= "6c3e6b6680d64e9c89e611d1a15f65be"
301 # Increment this major API version when introducing breaking changes
304 # Increment this PATCH version before using `charmcraft publish-lib` or reset
305 # to 0 if you are raising the major API version
308 PYDEPS
= ["ops>=2.0.0"]
310 logger
= logging
.getLogger(__name__
)
312 Diff
= namedtuple("Diff", "added changed deleted")
314 A tuple for storing the diff between two data mappings.
316 added - keys that were added
317 changed - keys that still exist but have new values
318 deleted - key that were deleted"""
321 def diff(event
: RelationChangedEvent
, bucket
: str) -> Diff
:
322 """Retrieves the diff of the data in the relation changed databag.
325 event: relation changed event.
326 bucket: bucket of the databag (app or unit)
329 a Diff instance containing the added, deleted and changed
330 keys from the event relation databag.
332 # Retrieve the old data from the data key in the application relation databag.
333 old_data
= json
.loads(event
.relation
.data
[bucket
].get("data", "{}"))
334 # Retrieve the new data from the event relation databag.
336 key
: value
for key
, value
in event
.relation
.data
[event
.app
].items() if key
!= "data"
339 # These are the keys that were added to the databag and triggered this event.
340 added
= new_data
.keys() - old_data
.keys()
341 # These are the keys that were removed from the databag and triggered this event.
342 deleted
= old_data
.keys() - new_data
.keys()
343 # These are the keys that already existed in the databag,
344 # but had their values changed.
345 changed
= {key
for key
in old_data
.keys() & new_data
.keys() if old_data
[key
] != new_data
[key
]}
346 # Convert the new_data to a serializable format and save it for a next diff check.
347 event
.relation
.data
[bucket
].update({"data": json
.dumps(new_data
)})
349 # Return the diff with all possible changes.
350 return Diff(added
, changed
, deleted
)
353 # Base DataProvides and DataRequires
356 class DataProvides(Object
, ABC
):
357 """Base provides-side of the data products relation."""
359 def __init__(self
, charm
: CharmBase
, relation_name
: str) -> None:
360 super().__init
__(charm
, relation_name
)
362 self
.local_app
= self
.charm
.model
.app
363 self
.local_unit
= self
.charm
.unit
364 self
.relation_name
= relation_name
365 self
.framework
.observe(
366 charm
.on
[relation_name
].relation_changed
,
367 self
._on
_relation
_changed
,
370 def _diff(self
, event
: RelationChangedEvent
) -> Diff
:
371 """Retrieves the diff of the data in the relation changed databag.
374 event: relation changed event.
377 a Diff instance containing the added, deleted and changed
378 keys from the event relation databag.
380 return diff(event
, self
.local_app
)
383 def _on_relation_changed(self
, event
: RelationChangedEvent
) -> None:
384 """Event emitted when the relation data has changed."""
385 raise NotImplementedError
387 def fetch_relation_data(self
) -> dict:
388 """Retrieves data from relation.
390 This function can be used to retrieve data from a relation
391 in the charm code when outside an event callback.
394 a dict of the values stored in the relation data bag
395 for all relation instances (indexed by the relation id).
398 for relation
in self
.relations
:
399 data
[relation
.id] = {
400 key
: value
for key
, value
in relation
.data
[relation
.app
].items() if key
!= "data"
404 def _update_relation_data(self
, relation_id
: int, data
: dict) -> None:
405 """Updates a set of key-value pairs in the relation.
407 This function writes in the application data bag, therefore,
408 only the leader unit can call it.
411 relation_id: the identifier for a particular relation.
412 data: dict containing the key-value pairs
413 that should be updated in the relation.
415 if self
.local_unit
.is_leader():
416 relation
= self
.charm
.model
.get_relation(self
.relation_name
, relation_id
)
417 relation
.data
[self
.local_app
].update(data
)
420 def relations(self
) -> List
[Relation
]:
421 """The list of Relation instances associated with this relation_name."""
422 return list(self
.charm
.model
.relations
[self
.relation_name
])
424 def set_credentials(self
, relation_id
: int, username
: str, password
: str) -> None:
427 This function writes in the application data bag, therefore,
428 only the leader unit can call it.
431 relation_id: the identifier for a particular relation.
432 username: user that was created.
433 password: password of the created user.
435 self
._update
_relation
_data
(
438 "username": username
,
439 "password": password
,
443 def set_tls(self
, relation_id
: int, tls
: str) -> None:
444 """Set whether TLS is enabled.
447 relation_id: the identifier for a particular relation.
448 tls: whether tls is enabled (True or False).
450 self
._update
_relation
_data
(relation_id
, {"tls": tls
})
452 def set_tls_ca(self
, relation_id
: int, tls_ca
: str) -> None:
453 """Set the TLS CA in the application relation databag.
456 relation_id: the identifier for a particular relation.
457 tls_ca: TLS certification authority.
459 self
._update
_relation
_data
(relation_id
, {"tls_ca": tls_ca
})
462 class DataRequires(Object
, ABC
):
463 """Requires-side of the relation."""
469 extra_user_roles
: str = None,
471 """Manager of base client relations."""
472 super().__init
__(charm
, relation_name
)
474 self
.extra_user_roles
= extra_user_roles
475 self
.local_app
= self
.charm
.model
.app
476 self
.local_unit
= self
.charm
.unit
477 self
.relation_name
= relation_name
478 self
.framework
.observe(
479 self
.charm
.on
[relation_name
].relation_joined
, self
._on
_relation
_joined
_event
481 self
.framework
.observe(
482 self
.charm
.on
[relation_name
].relation_changed
, self
._on
_relation
_changed
_event
486 def _on_relation_joined_event(self
, event
: RelationJoinedEvent
) -> None:
487 """Event emitted when the application joins the relation."""
488 raise NotImplementedError
491 def _on_relation_changed_event(self
, event
: RelationChangedEvent
) -> None:
492 raise NotImplementedError
494 def fetch_relation_data(self
) -> dict:
495 """Retrieves data from relation.
497 This function can be used to retrieve data from a relation
498 in the charm code when outside an event callback.
499 Function cannot be used in `*-relation-broken` events and will raise an exception.
502 a dict of the values stored in the relation data bag
503 for all relation instances (indexed by the relation ID).
506 for relation
in self
.relations
:
507 data
[relation
.id] = {
508 key
: value
for key
, value
in relation
.data
[relation
.app
].items() if key
!= "data"
512 def _update_relation_data(self
, relation_id
: int, data
: dict) -> None:
513 """Updates a set of key-value pairs in the relation.
515 This function writes in the application data bag, therefore,
516 only the leader unit can call it.
519 relation_id: the identifier for a particular relation.
520 data: dict containing the key-value pairs
521 that should be updated in the relation.
523 if self
.local_unit
.is_leader():
524 relation
= self
.charm
.model
.get_relation(self
.relation_name
, relation_id
)
525 relation
.data
[self
.local_app
].update(data
)
527 def _diff(self
, event
: RelationChangedEvent
) -> Diff
:
528 """Retrieves the diff of the data in the relation changed databag.
531 event: relation changed event.
534 a Diff instance containing the added, deleted and changed
535 keys from the event relation databag.
537 return diff(event
, self
.local_unit
)
540 def relations(self
) -> List
[Relation
]:
541 """The list of Relation instances associated with this relation_name."""
544 for relation
in self
.charm
.model
.relations
[self
.relation_name
]
545 if self
._is
_relation
_active
(relation
)
549 def _is_relation_active(relation
: Relation
):
551 _
= repr(relation
.data
)
557 def _is_resource_created_for_relation(relation
: Relation
):
559 "username" in relation
.data
[relation
.app
] and "password" in relation
.data
[relation
.app
]
562 def is_resource_created(self
, relation_id
: Optional
[int] = None) -> bool:
563 """Check if the resource has been created.
565 This function can be used to check if the Provider answered with data in the charm code
566 when outside an event callback.
569 relation_id (int, optional): When provided the check is done only for the relation id
570 provided, otherwise the check is done for all relations
576 IndexError: If relation_id is provided but that relation does not exist
578 if relation_id
is not None:
580 relation
= [relation
for relation
in self
.relations
if relation
.id == relation_id
][
583 return self
._is
_resource
_created
_for
_relation
(relation
)
585 raise IndexError(f
"relation id {relation_id} cannot be accessed")
590 self
._is
_resource
_created
_for
_relation
(relation
)
591 for relation
in self
.relations
602 class ExtraRoleEvent(RelationEvent
):
603 """Base class for data events."""
606 def extra_user_roles(self
) -> Optional
[str]:
607 """Returns the extra user roles that were requested."""
608 return self
.relation
.data
[self
.relation
.app
].get("extra-user-roles")
611 class AuthenticationEvent(RelationEvent
):
612 """Base class for authentication fields for events."""
615 def username(self
) -> Optional
[str]:
616 """Returns the created username."""
617 return self
.relation
.data
[self
.relation
.app
].get("username")
620 def password(self
) -> Optional
[str]:
621 """Returns the password for the created user."""
622 return self
.relation
.data
[self
.relation
.app
].get("password")
625 def tls(self
) -> Optional
[str]:
626 """Returns whether TLS is configured."""
627 return self
.relation
.data
[self
.relation
.app
].get("tls")
630 def tls_ca(self
) -> Optional
[str]:
631 """Returns TLS CA."""
632 return self
.relation
.data
[self
.relation
.app
].get("tls-ca")
635 # Database related events and fields
638 class DatabaseProvidesEvent(RelationEvent
):
639 """Base class for database events."""
642 def database(self
) -> Optional
[str]:
643 """Returns the database that was requested."""
644 return self
.relation
.data
[self
.relation
.app
].get("database")
647 class DatabaseRequestedEvent(DatabaseProvidesEvent
, ExtraRoleEvent
):
648 """Event emitted when a new database is requested for use on this relation."""
651 class DatabaseProvidesEvents(CharmEvents
):
654 This class defines the events that the database can emit.
657 database_requested
= EventSource(DatabaseRequestedEvent
)
660 class DatabaseRequiresEvent(RelationEvent
):
661 """Base class for database events."""
664 def endpoints(self
) -> Optional
[str]:
665 """Returns a comma separated list of read/write endpoints."""
666 return self
.relation
.data
[self
.relation
.app
].get("endpoints")
669 def read_only_endpoints(self
) -> Optional
[str]:
670 """Returns a comma separated list of read only endpoints."""
671 return self
.relation
.data
[self
.relation
.app
].get("read-only-endpoints")
674 def replset(self
) -> Optional
[str]:
675 """Returns the replicaset name.
679 return self
.relation
.data
[self
.relation
.app
].get("replset")
682 def uris(self
) -> Optional
[str]:
683 """Returns the connection URIs.
685 MongoDB, Redis, OpenSearch.
687 return self
.relation
.data
[self
.relation
.app
].get("uris")
690 def version(self
) -> Optional
[str]:
691 """Returns the version of the database.
693 Version as informed by the database daemon.
695 return self
.relation
.data
[self
.relation
.app
].get("version")
698 class DatabaseCreatedEvent(AuthenticationEvent
, DatabaseRequiresEvent
):
699 """Event emitted when a new database is created for use on this relation."""
702 class DatabaseEndpointsChangedEvent(AuthenticationEvent
, DatabaseRequiresEvent
):
703 """Event emitted when the read/write endpoints are changed."""
706 class DatabaseReadOnlyEndpointsChangedEvent(AuthenticationEvent
, DatabaseRequiresEvent
):
707 """Event emitted when the read only endpoints are changed."""
710 class DatabaseRequiresEvents(CharmEvents
):
713 This class defines the events that the database can emit.
716 database_created
= EventSource(DatabaseCreatedEvent
)
717 endpoints_changed
= EventSource(DatabaseEndpointsChangedEvent
)
718 read_only_endpoints_changed
= EventSource(DatabaseReadOnlyEndpointsChangedEvent
)
721 # Database Provider and Requires
724 class DatabaseProvides(DataProvides
):
725 """Provider-side of the database relations."""
727 on
= DatabaseProvidesEvents()
729 def __init__(self
, charm
: CharmBase
, relation_name
: str) -> None:
730 super().__init
__(charm
, relation_name
)
732 def _on_relation_changed(self
, event
: RelationChangedEvent
) -> None:
733 """Event emitted when the relation has changed."""
734 # Only the leader should handle this event.
735 if not self
.local_unit
.is_leader():
738 # Check which data has changed to emit customs events.
739 diff
= self
._diff
(event
)
741 # Emit a database requested event if the setup key (database name and optional
742 # extra user roles) was added to the relation databag by the application.
743 if "database" in diff
.added
:
744 self
.on
.database_requested
.emit(event
.relation
, app
=event
.app
, unit
=event
.unit
)
746 def set_endpoints(self
, relation_id
: int, connection_strings
: str) -> None:
747 """Set database primary connections.
749 This function writes in the application data bag, therefore,
750 only the leader unit can call it.
753 relation_id: the identifier for a particular relation.
754 connection_strings: database hosts and ports comma separated list.
756 self
._update
_relation
_data
(relation_id
, {"endpoints": connection_strings
})
758 def set_read_only_endpoints(self
, relation_id
: int, connection_strings
: str) -> None:
759 """Set database replicas connection strings.
761 This function writes in the application data bag, therefore,
762 only the leader unit can call it.
765 relation_id: the identifier for a particular relation.
766 connection_strings: database hosts and ports comma separated list.
768 self
._update
_relation
_data
(relation_id
, {"read-only-endpoints": connection_strings
})
770 def set_replset(self
, relation_id
: int, replset
: str) -> None:
771 """Set replica set name in the application relation databag.
776 relation_id: the identifier for a particular relation.
777 replset: replica set name.
779 self
._update
_relation
_data
(relation_id
, {"replset": replset
})
781 def set_uris(self
, relation_id
: int, uris
: str) -> None:
782 """Set the database connection URIs in the application relation databag.
784 MongoDB, Redis, and OpenSearch only.
787 relation_id: the identifier for a particular relation.
788 uris: connection URIs.
790 self
._update
_relation
_data
(relation_id
, {"uris": uris
})
792 def set_version(self
, relation_id
: int, version
: str) -> None:
793 """Set the database version in the application relation databag.
796 relation_id: the identifier for a particular relation.
797 version: database version.
799 self
._update
_relation
_data
(relation_id
, {"version": version
})
802 class DatabaseRequires(DataRequires
):
803 """Requires-side of the database relation."""
805 on
= DatabaseRequiresEvents()
812 extra_user_roles
: str = None,
813 relations_aliases
: List
[str] = None,
815 """Manager of database client relations."""
816 super().__init
__(charm
, relation_name
, extra_user_roles
)
817 self
.database
= database_name
818 self
.relations_aliases
= relations_aliases
820 # Define custom event names for each alias.
821 if relations_aliases
:
822 # Ensure the number of aliases does not exceed the maximum
823 # of connections allowed in the specific relation.
824 relation_connection_limit
= self
.charm
.meta
.requires
[relation_name
].limit
825 if len(relations_aliases
) != relation_connection_limit
:
827 f
"The number of aliases must match the maximum number of connections allowed in the relation. "
828 f
"Expected {relation_connection_limit}, got {len(relations_aliases)}"
831 for relation_alias
in relations_aliases
:
832 self
.on
.define_event(f
"{relation_alias}_database_created", DatabaseCreatedEvent
)
833 self
.on
.define_event(
834 f
"{relation_alias}_endpoints_changed", DatabaseEndpointsChangedEvent
836 self
.on
.define_event(
837 f
"{relation_alias}_read_only_endpoints_changed",
838 DatabaseReadOnlyEndpointsChangedEvent
,
841 def _assign_relation_alias(self
, relation_id
: int) -> None:
842 """Assigns an alias to a relation.
844 This function writes in the unit data bag.
847 relation_id: the identifier for a particular relation.
849 # If no aliases were provided, return immediately.
850 if not self
.relations_aliases
:
853 # Return if an alias was already assigned to this relation
854 # (like when there are more than one unit joining the relation).
856 self
.charm
.model
.get_relation(self
.relation_name
, relation_id
)
857 .data
[self
.local_unit
]
862 # Retrieve the available aliases (the ones that weren't assigned to any relation).
863 available_aliases
= self
.relations_aliases
[:]
864 for relation
in self
.charm
.model
.relations
[self
.relation_name
]:
865 alias
= relation
.data
[self
.local_unit
].get("alias")
867 logger
.debug("Alias %s was already assigned to relation %d", alias
, relation
.id)
868 available_aliases
.remove(alias
)
870 # Set the alias in the unit relation databag of the specific relation.
871 relation
= self
.charm
.model
.get_relation(self
.relation_name
, relation_id
)
872 relation
.data
[self
.local_unit
].update({"alias": available_aliases
[0]})
874 def _emit_aliased_event(self
, event
: RelationChangedEvent
, event_name
: str) -> None:
875 """Emit an aliased event to a particular relation if it has an alias.
878 event: the relation changed event that was received.
879 event_name: the name of the event to emit.
881 alias
= self
._get
_relation
_alias
(event
.relation
.id)
883 getattr(self
.on
, f
"{alias}_{event_name}").emit(
884 event
.relation
, app
=event
.app
, unit
=event
.unit
887 def _get_relation_alias(self
, relation_id
: int) -> Optional
[str]:
888 """Returns the relation alias.
891 relation_id: the identifier for a particular relation.
894 the relation alias or None if the relation was not found.
896 for relation
in self
.charm
.model
.relations
[self
.relation_name
]:
897 if relation
.id == relation_id
:
898 return relation
.data
[self
.local_unit
].get("alias")
901 def _on_relation_joined_event(self
, event
: RelationJoinedEvent
) -> None:
902 """Event emitted when the application joins the database relation."""
903 # If relations aliases were provided, assign one to the relation.
904 self
._assign
_relation
_alias
(event
.relation
.id)
906 # Sets both database and extra user roles in the relation
907 # if the roles are provided. Otherwise, sets only the database.
908 if self
.extra_user_roles
:
909 self
._update
_relation
_data
(
912 "database": self
.database
,
913 "extra-user-roles": self
.extra_user_roles
,
917 self
._update
_relation
_data
(event
.relation
.id, {"database": self
.database
})
919 def _on_relation_changed_event(self
, event
: RelationChangedEvent
) -> None:
920 """Event emitted when the database relation has changed."""
921 # Check which data has changed to emit customs events.
922 diff
= self
._diff
(event
)
924 # Check if the database is created
925 # (the database charm shared the credentials).
926 if "username" in diff
.added
and "password" in diff
.added
:
927 # Emit the default event (the one without an alias).
928 logger
.info("database created at %s", datetime
.now())
929 self
.on
.database_created
.emit(event
.relation
, app
=event
.app
, unit
=event
.unit
)
931 # Emit the aliased event (if any).
932 self
._emit
_aliased
_event
(event
, "database_created")
934 # To avoid unnecessary application restarts do not trigger
935 # “endpoints_changed“ event if “database_created“ is triggered.
938 # Emit an endpoints changed event if the database
939 # added or changed this info in the relation databag.
940 if "endpoints" in diff
.added
or "endpoints" in diff
.changed
:
941 # Emit the default event (the one without an alias).
942 logger
.info("endpoints changed on %s", datetime
.now())
943 self
.on
.endpoints_changed
.emit(event
.relation
, app
=event
.app
, unit
=event
.unit
)
945 # Emit the aliased event (if any).
946 self
._emit
_aliased
_event
(event
, "endpoints_changed")
948 # To avoid unnecessary application restarts do not trigger
949 # “read_only_endpoints_changed“ event if “endpoints_changed“ is triggered.
952 # Emit a read only endpoints changed event if the database
953 # added or changed this info in the relation databag.
954 if "read-only-endpoints" in diff
.added
or "read-only-endpoints" in diff
.changed
:
955 # Emit the default event (the one without an alias).
956 logger
.info("read-only-endpoints changed on %s", datetime
.now())
957 self
.on
.read_only_endpoints_changed
.emit(
958 event
.relation
, app
=event
.app
, unit
=event
.unit
961 # Emit the aliased event (if any).
962 self
._emit
_aliased
_event
(event
, "read_only_endpoints_changed")
965 # Kafka related events
968 class KafkaProvidesEvent(RelationEvent
):
969 """Base class for Kafka events."""
972 def topic(self
) -> Optional
[str]:
973 """Returns the topic that was requested."""
974 return self
.relation
.data
[self
.relation
.app
].get("topic")
977 class TopicRequestedEvent(KafkaProvidesEvent
, ExtraRoleEvent
):
978 """Event emitted when a new topic is requested for use on this relation."""
981 class KafkaProvidesEvents(CharmEvents
):
984 This class defines the events that the Kafka can emit.
987 topic_requested
= EventSource(TopicRequestedEvent
)
990 class KafkaRequiresEvent(RelationEvent
):
991 """Base class for Kafka events."""
994 def bootstrap_server(self
) -> Optional
[str]:
995 """Returns a a comma-seperated list of broker uris."""
996 return self
.relation
.data
[self
.relation
.app
].get("endpoints")
999 def consumer_group_prefix(self
) -> Optional
[str]:
1000 """Returns the consumer-group-prefix."""
1001 return self
.relation
.data
[self
.relation
.app
].get("consumer-group-prefix")
1004 def zookeeper_uris(self
) -> Optional
[str]:
1005 """Returns a comma separated list of Zookeeper uris."""
1006 return self
.relation
.data
[self
.relation
.app
].get("zookeeper-uris")
1009 class TopicCreatedEvent(AuthenticationEvent
, KafkaRequiresEvent
):
1010 """Event emitted when a new topic is created for use on this relation."""
1013 class BootstrapServerChangedEvent(AuthenticationEvent
, KafkaRequiresEvent
):
1014 """Event emitted when the bootstrap server is changed."""
1017 class KafkaRequiresEvents(CharmEvents
):
1020 This class defines the events that the Kafka can emit.
1023 topic_created
= EventSource(TopicCreatedEvent
)
1024 bootstrap_server_changed
= EventSource(BootstrapServerChangedEvent
)
1027 # Kafka Provides and Requires
1030 class KafkaProvides(DataProvides
):
1031 """Provider-side of the Kafka relation."""
1033 on
= KafkaProvidesEvents()
1035 def __init__(self
, charm
: CharmBase
, relation_name
: str) -> None:
1036 super().__init
__(charm
, relation_name
)
1038 def _on_relation_changed(self
, event
: RelationChangedEvent
) -> None:
1039 """Event emitted when the relation has changed."""
1040 # Only the leader should handle this event.
1041 if not self
.local_unit
.is_leader():
1044 # Check which data has changed to emit customs events.
1045 diff
= self
._diff
(event
)
1047 # Emit a topic requested event if the setup key (topic name and optional
1048 # extra user roles) was added to the relation databag by the application.
1049 if "topic" in diff
.added
:
1050 self
.on
.topic_requested
.emit(event
.relation
, app
=event
.app
, unit
=event
.unit
)
1052 def set_bootstrap_server(self
, relation_id
: int, bootstrap_server
: str) -> None:
1053 """Set the bootstrap server in the application relation databag.
1056 relation_id: the identifier for a particular relation.
1057 bootstrap_server: the bootstrap server address.
1059 self
._update
_relation
_data
(relation_id
, {"endpoints": bootstrap_server
})
1061 def set_consumer_group_prefix(self
, relation_id
: int, consumer_group_prefix
: str) -> None:
1062 """Set the consumer group prefix in the application relation databag.
1065 relation_id: the identifier for a particular relation.
1066 consumer_group_prefix: the consumer group prefix string.
1068 self
._update
_relation
_data
(relation_id
, {"consumer-group-prefix": consumer_group_prefix
})
1070 def set_zookeeper_uris(self
, relation_id
: int, zookeeper_uris
: str) -> None:
1071 """Set the zookeeper uris in the application relation databag.
1074 relation_id: the identifier for a particular relation.
1075 zookeeper_uris: comma-seperated list of ZooKeeper server uris.
1077 self
._update
_relation
_data
(relation_id
, {"zookeeper-uris": zookeeper_uris
})
1080 class KafkaRequires(DataRequires
):
1081 """Requires-side of the Kafka relation."""
1083 on
= KafkaRequiresEvents()
1085 def __init__(self
, charm
, relation_name
: str, topic
: str, extra_user_roles
: str = None):
1086 """Manager of Kafka client relations."""
1087 # super().__init__(charm, relation_name)
1088 super().__init
__(charm
, relation_name
, extra_user_roles
)
1092 def _on_relation_joined_event(self
, event
: RelationJoinedEvent
) -> None:
1093 """Event emitted when the application joins the Kafka relation."""
1094 # Sets both topic and extra user roles in the relation
1095 # if the roles are provided. Otherwise, sets only the topic.
1096 self
._update
_relation
_data
(
1099 "topic": self
.topic
,
1100 "extra-user-roles": self
.extra_user_roles
,
1102 if self
.extra_user_roles
is not None
1103 else {"topic": self
.topic
},
1106 def _on_relation_changed_event(self
, event
: RelationChangedEvent
) -> None:
1107 """Event emitted when the Kafka relation has changed."""
1108 # Check which data has changed to emit customs events.
1109 diff
= self
._diff
(event
)
1111 # Check if the topic is created
1112 # (the Kafka charm shared the credentials).
1113 if "username" in diff
.added
and "password" in diff
.added
:
1114 # Emit the default event (the one without an alias).
1115 logger
.info("topic created at %s", datetime
.now())
1116 self
.on
.topic_created
.emit(event
.relation
, app
=event
.app
, unit
=event
.unit
)
1118 # To avoid unnecessary application restarts do not trigger
1119 # “endpoints_changed“ event if “topic_created“ is triggered.
1122 # Emit an endpoints (bootstap-server) changed event if the Kakfa endpoints
1123 # added or changed this info in the relation databag.
1124 if "endpoints" in diff
.added
or "endpoints" in diff
.changed
:
1125 # Emit the default event (the one without an alias).
1126 logger
.info("endpoints changed on %s", datetime
.now())
1127 self
.on
.bootstrap_server_changed
.emit(
1128 event
.relation
, app
=event
.app
, unit
=event
.unit
1129 ) # here check if this is the right design