Integration of OSM Charms with new MongoDB
[osm/devops.git] / installers / charm / osm-ro / lib / charms / data_platform_libs / v0 / data_interfaces.py
1 # Copyright 2023 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Library to manage the relation for the data-platform products.
16
17 This library contains the Requires and Provides classes for handling the relation
18 between an application and multiple managed application supported by the data-team:
19 MySQL, Postgresql, MongoDB, Redis, and Kakfa.
20
21 ### Database (MySQL, Postgresql, MongoDB, and Redis)
22
23 #### Requires Charm
24 This library is a uniform interface to a selection of common database
25 metadata, with added custom events that add convenience to database management,
26 and methods to consume the application related data.
27
28
29 Following an example of using the DatabaseCreatedEvent, in the context of the
30 application charm code:
31
32 ```python
33
34 from charms.data_platform_libs.v0.data_interfaces import (
35 DatabaseCreatedEvent,
36 DatabaseRequires,
37 )
38
39 class ApplicationCharm(CharmBase):
40 # Application charm that connects to database charms.
41
42 def __init__(self, *args):
43 super().__init__(*args)
44
45 # Charm events defined in the database requires charm library.
46 self.database = DatabaseRequires(self, relation_name="database", database_name="database")
47 self.framework.observe(self.database.on.database_created, self._on_database_created)
48
49 def _on_database_created(self, event: DatabaseCreatedEvent) -> None:
50 # Handle the created database
51
52 # Create configuration file for app
53 config_file = self._render_app_config_file(
54 event.username,
55 event.password,
56 event.endpoints,
57 )
58
59 # Start application with rendered configuration
60 self._start_application(config_file)
61
62 # Set active status
63 self.unit.status = ActiveStatus("received database credentials")
64 ```
65
66 As shown above, the library provides some custom events to handle specific situations,
67 which are listed below:
68
69 - database_created: event emitted when the requested database is created.
70 - endpoints_changed: event emitted when the read/write endpoints of the database have changed.
71 - read_only_endpoints_changed: event emitted when the read-only endpoints of the database
72 have changed. Event is not triggered if read/write endpoints changed too.
73
74 If it is needed to connect multiple database clusters to the same relation endpoint
75 the application charm can implement the same code as if it would connect to only
76 one database cluster (like the above code example).
77
78 To differentiate multiple clusters connected to the same relation endpoint
79 the application charm can use the name of the remote application:
80
81 ```python
82
83 def _on_database_created(self, event: DatabaseCreatedEvent) -> None:
84 # Get the remote app name of the cluster that triggered this event
85 cluster = event.relation.app.name
86 ```
87
88 It is also possible to provide an alias for each different database cluster/relation.
89
90 So, it is possible to differentiate the clusters in two ways.
91 The first is to use the remote application name, i.e., `event.relation.app.name`, as above.
92
93 The second way is to use different event handlers to handle each cluster events.
94 The implementation would be something like the following code:
95
96 ```python
97
98 from charms.data_platform_libs.v0.data_interfaces import (
99 DatabaseCreatedEvent,
100 DatabaseRequires,
101 )
102
103 class ApplicationCharm(CharmBase):
104 # Application charm that connects to database charms.
105
106 def __init__(self, *args):
107 super().__init__(*args)
108
109 # Define the cluster aliases and one handler for each cluster database created event.
110 self.database = DatabaseRequires(
111 self,
112 relation_name="database",
113 database_name="database",
114 relations_aliases = ["cluster1", "cluster2"],
115 )
116 self.framework.observe(
117 self.database.on.cluster1_database_created, self._on_cluster1_database_created
118 )
119 self.framework.observe(
120 self.database.on.cluster2_database_created, self._on_cluster2_database_created
121 )
122
123 def _on_cluster1_database_created(self, event: DatabaseCreatedEvent) -> None:
124 # Handle the created database on the cluster named cluster1
125
126 # Create configuration file for app
127 config_file = self._render_app_config_file(
128 event.username,
129 event.password,
130 event.endpoints,
131 )
132 ...
133
134 def _on_cluster2_database_created(self, event: DatabaseCreatedEvent) -> None:
135 # Handle the created database on the cluster named cluster2
136
137 # Create configuration file for app
138 config_file = self._render_app_config_file(
139 event.username,
140 event.password,
141 event.endpoints,
142 )
143 ...
144
145 ```
146
147 ### Provider Charm
148
149 Following an example of using the DatabaseRequestedEvent, in the context of the
150 database charm code:
151
152 ```python
153 from charms.data_platform_libs.v0.data_interfaces import DatabaseProvides
154
155 class SampleCharm(CharmBase):
156
157 def __init__(self, *args):
158 super().__init__(*args)
159 # Charm events defined in the database provides charm library.
160 self.provided_database = DatabaseProvides(self, relation_name="database")
161 self.framework.observe(self.provided_database.on.database_requested,
162 self._on_database_requested)
163 # Database generic helper
164 self.database = DatabaseHelper()
165
166 def _on_database_requested(self, event: DatabaseRequestedEvent) -> None:
167 # Handle the event triggered by a new database requested in the relation
168 # Retrieve the database name using the charm library.
169 db_name = event.database
170 # generate a new user credential
171 username = self.database.generate_user()
172 password = self.database.generate_password()
173 # set the credentials for the relation
174 self.provided_database.set_credentials(event.relation.id, username, password)
175 # set other variables for the relation event.set_tls("False")
176 ```
177 As shown above, the library provides a custom event (database_requested) to handle
178 the situation when an application charm requests a new database to be created.
179 It's preferred to subscribe to this event instead of relation changed event to avoid
180 creating a new database when other information other than a database name is
181 exchanged in the relation databag.
182
183 ### Kafka
184
185 This library is the interface to use and interact with the Kafka charm. This library contains
186 custom events that add convenience to manage Kafka, and provides methods to consume the
187 application related data.
188
189 #### Requirer Charm
190
191 ```python
192
193 from charms.data_platform_libs.v0.data_interfaces import (
194 BootstrapServerChangedEvent,
195 KafkaRequires,
196 TopicCreatedEvent,
197 )
198
199 class ApplicationCharm(CharmBase):
200
201 def __init__(self, *args):
202 super().__init__(*args)
203 self.kafka = KafkaRequires(self, "kafka_client", "test-topic")
204 self.framework.observe(
205 self.kafka.on.bootstrap_server_changed, self._on_kafka_bootstrap_server_changed
206 )
207 self.framework.observe(
208 self.kafka.on.topic_created, self._on_kafka_topic_created
209 )
210
211 def _on_kafka_bootstrap_server_changed(self, event: BootstrapServerChangedEvent):
212 # Event triggered when a bootstrap server was changed for this application
213
214 new_bootstrap_server = event.bootstrap_server
215 ...
216
217 def _on_kafka_topic_created(self, event: TopicCreatedEvent):
218 # Event triggered when a topic was created for this application
219 username = event.username
220 password = event.password
221 tls = event.tls
222 tls_ca= event.tls_ca
223 bootstrap_server event.bootstrap_server
224 consumer_group_prefic = event.consumer_group_prefix
225 zookeeper_uris = event.zookeeper_uris
226 ...
227
228 ```
229
230 As shown above, the library provides some custom events to handle specific situations,
231 which are listed below:
232
233 - topic_created: event emitted when the requested topic is created.
234 - bootstrap_server_changed: event emitted when the bootstrap server have changed.
235 - credential_changed: event emitted when the credentials of Kafka changed.
236
237 ### Provider Charm
238
239 Following the previous example, this is an example of the provider charm.
240
241 ```python
242 class SampleCharm(CharmBase):
243
244 from charms.data_platform_libs.v0.data_interfaces import (
245 KafkaProvides,
246 TopicRequestedEvent,
247 )
248
249 def __init__(self, *args):
250 super().__init__(*args)
251
252 # Default charm events.
253 self.framework.observe(self.on.start, self._on_start)
254
255 # Charm events defined in the Kafka Provides charm library.
256 self.kafka_provider = KafkaProvides(self, relation_name="kafka_client")
257 self.framework.observe(self.kafka_provider.on.topic_requested, self._on_topic_requested)
258 # Kafka generic helper
259 self.kafka = KafkaHelper()
260
261 def _on_topic_requested(self, event: TopicRequestedEvent):
262 # Handle the on_topic_requested event.
263
264 topic = event.topic
265 relation_id = event.relation.id
266 # set connection info in the databag relation
267 self.kafka_provider.set_bootstrap_server(relation_id, self.kafka.get_bootstrap_server())
268 self.kafka_provider.set_credentials(relation_id, username=username, password=password)
269 self.kafka_provider.set_consumer_group_prefix(relation_id, ...)
270 self.kafka_provider.set_tls(relation_id, "False")
271 self.kafka_provider.set_zookeeper_uris(relation_id, ...)
272
273 ```
274 As shown above, the library provides a custom event (topic_requested) to handle
275 the situation when an application charm requests a new topic to be created.
276 It is preferred to subscribe to this event instead of relation changed event to avoid
277 creating a new topic when other information other than a topic name is
278 exchanged in the relation databag.
279 """
280
281 import json
282 import logging
283 from abc import ABC, abstractmethod
284 from collections import namedtuple
285 from datetime import datetime
286 from typing import List, Optional
287
288 from ops.charm import (
289 CharmBase,
290 CharmEvents,
291 RelationChangedEvent,
292 RelationEvent,
293 RelationJoinedEvent,
294 )
295 from ops.framework import EventSource, Object
296 from ops.model import Relation
297
298 # The unique Charmhub library identifier, never change it
299 LIBID = "6c3e6b6680d64e9c89e611d1a15f65be"
300
301 # Increment this major API version when introducing breaking changes
302 LIBAPI = 0
303
304 # Increment this PATCH version before using `charmcraft publish-lib` or reset
305 # to 0 if you are raising the major API version
306 LIBPATCH = 7
307
308 PYDEPS = ["ops>=2.0.0"]
309
310 logger = logging.getLogger(__name__)
311
312 Diff = namedtuple("Diff", "added changed deleted")
313 Diff.__doc__ = """
314 A tuple for storing the diff between two data mappings.
315
316 added - keys that were added
317 changed - keys that still exist but have new values
318 deleted - key that were deleted"""
319
320
321 def diff(event: RelationChangedEvent, bucket: str) -> Diff:
322 """Retrieves the diff of the data in the relation changed databag.
323
324 Args:
325 event: relation changed event.
326 bucket: bucket of the databag (app or unit)
327
328 Returns:
329 a Diff instance containing the added, deleted and changed
330 keys from the event relation databag.
331 """
332 # Retrieve the old data from the data key in the application relation databag.
333 old_data = json.loads(event.relation.data[bucket].get("data", "{}"))
334 # Retrieve the new data from the event relation databag.
335 new_data = {
336 key: value for key, value in event.relation.data[event.app].items() if key != "data"
337 }
338
339 # These are the keys that were added to the databag and triggered this event.
340 added = new_data.keys() - old_data.keys()
341 # These are the keys that were removed from the databag and triggered this event.
342 deleted = old_data.keys() - new_data.keys()
343 # These are the keys that already existed in the databag,
344 # but had their values changed.
345 changed = {key for key in old_data.keys() & new_data.keys() if old_data[key] != new_data[key]}
346 # Convert the new_data to a serializable format and save it for a next diff check.
347 event.relation.data[bucket].update({"data": json.dumps(new_data)})
348
349 # Return the diff with all possible changes.
350 return Diff(added, changed, deleted)
351
352
353 # Base DataProvides and DataRequires
354
355
356 class DataProvides(Object, ABC):
357 """Base provides-side of the data products relation."""
358
359 def __init__(self, charm: CharmBase, relation_name: str) -> None:
360 super().__init__(charm, relation_name)
361 self.charm = charm
362 self.local_app = self.charm.model.app
363 self.local_unit = self.charm.unit
364 self.relation_name = relation_name
365 self.framework.observe(
366 charm.on[relation_name].relation_changed,
367 self._on_relation_changed,
368 )
369
370 def _diff(self, event: RelationChangedEvent) -> Diff:
371 """Retrieves the diff of the data in the relation changed databag.
372
373 Args:
374 event: relation changed event.
375
376 Returns:
377 a Diff instance containing the added, deleted and changed
378 keys from the event relation databag.
379 """
380 return diff(event, self.local_app)
381
382 @abstractmethod
383 def _on_relation_changed(self, event: RelationChangedEvent) -> None:
384 """Event emitted when the relation data has changed."""
385 raise NotImplementedError
386
387 def fetch_relation_data(self) -> dict:
388 """Retrieves data from relation.
389
390 This function can be used to retrieve data from a relation
391 in the charm code when outside an event callback.
392
393 Returns:
394 a dict of the values stored in the relation data bag
395 for all relation instances (indexed by the relation id).
396 """
397 data = {}
398 for relation in self.relations:
399 data[relation.id] = {
400 key: value for key, value in relation.data[relation.app].items() if key != "data"
401 }
402 return data
403
404 def _update_relation_data(self, relation_id: int, data: dict) -> None:
405 """Updates a set of key-value pairs in the relation.
406
407 This function writes in the application data bag, therefore,
408 only the leader unit can call it.
409
410 Args:
411 relation_id: the identifier for a particular relation.
412 data: dict containing the key-value pairs
413 that should be updated in the relation.
414 """
415 if self.local_unit.is_leader():
416 relation = self.charm.model.get_relation(self.relation_name, relation_id)
417 relation.data[self.local_app].update(data)
418
419 @property
420 def relations(self) -> List[Relation]:
421 """The list of Relation instances associated with this relation_name."""
422 return list(self.charm.model.relations[self.relation_name])
423
424 def set_credentials(self, relation_id: int, username: str, password: str) -> None:
425 """Set credentials.
426
427 This function writes in the application data bag, therefore,
428 only the leader unit can call it.
429
430 Args:
431 relation_id: the identifier for a particular relation.
432 username: user that was created.
433 password: password of the created user.
434 """
435 self._update_relation_data(
436 relation_id,
437 {
438 "username": username,
439 "password": password,
440 },
441 )
442
443 def set_tls(self, relation_id: int, tls: str) -> None:
444 """Set whether TLS is enabled.
445
446 Args:
447 relation_id: the identifier for a particular relation.
448 tls: whether tls is enabled (True or False).
449 """
450 self._update_relation_data(relation_id, {"tls": tls})
451
452 def set_tls_ca(self, relation_id: int, tls_ca: str) -> None:
453 """Set the TLS CA in the application relation databag.
454
455 Args:
456 relation_id: the identifier for a particular relation.
457 tls_ca: TLS certification authority.
458 """
459 self._update_relation_data(relation_id, {"tls_ca": tls_ca})
460
461
462 class DataRequires(Object, ABC):
463 """Requires-side of the relation."""
464
465 def __init__(
466 self,
467 charm,
468 relation_name: str,
469 extra_user_roles: str = None,
470 ):
471 """Manager of base client relations."""
472 super().__init__(charm, relation_name)
473 self.charm = charm
474 self.extra_user_roles = extra_user_roles
475 self.local_app = self.charm.model.app
476 self.local_unit = self.charm.unit
477 self.relation_name = relation_name
478 self.framework.observe(
479 self.charm.on[relation_name].relation_joined, self._on_relation_joined_event
480 )
481 self.framework.observe(
482 self.charm.on[relation_name].relation_changed, self._on_relation_changed_event
483 )
484
485 @abstractmethod
486 def _on_relation_joined_event(self, event: RelationJoinedEvent) -> None:
487 """Event emitted when the application joins the relation."""
488 raise NotImplementedError
489
490 @abstractmethod
491 def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
492 raise NotImplementedError
493
494 def fetch_relation_data(self) -> dict:
495 """Retrieves data from relation.
496
497 This function can be used to retrieve data from a relation
498 in the charm code when outside an event callback.
499 Function cannot be used in `*-relation-broken` events and will raise an exception.
500
501 Returns:
502 a dict of the values stored in the relation data bag
503 for all relation instances (indexed by the relation ID).
504 """
505 data = {}
506 for relation in self.relations:
507 data[relation.id] = {
508 key: value for key, value in relation.data[relation.app].items() if key != "data"
509 }
510 return data
511
512 def _update_relation_data(self, relation_id: int, data: dict) -> None:
513 """Updates a set of key-value pairs in the relation.
514
515 This function writes in the application data bag, therefore,
516 only the leader unit can call it.
517
518 Args:
519 relation_id: the identifier for a particular relation.
520 data: dict containing the key-value pairs
521 that should be updated in the relation.
522 """
523 if self.local_unit.is_leader():
524 relation = self.charm.model.get_relation(self.relation_name, relation_id)
525 relation.data[self.local_app].update(data)
526
527 def _diff(self, event: RelationChangedEvent) -> Diff:
528 """Retrieves the diff of the data in the relation changed databag.
529
530 Args:
531 event: relation changed event.
532
533 Returns:
534 a Diff instance containing the added, deleted and changed
535 keys from the event relation databag.
536 """
537 return diff(event, self.local_unit)
538
539 @property
540 def relations(self) -> List[Relation]:
541 """The list of Relation instances associated with this relation_name."""
542 return [
543 relation
544 for relation in self.charm.model.relations[self.relation_name]
545 if self._is_relation_active(relation)
546 ]
547
548 @staticmethod
549 def _is_relation_active(relation: Relation):
550 try:
551 _ = repr(relation.data)
552 return True
553 except RuntimeError:
554 return False
555
556 @staticmethod
557 def _is_resource_created_for_relation(relation: Relation):
558 return (
559 "username" in relation.data[relation.app] and "password" in relation.data[relation.app]
560 )
561
562 def is_resource_created(self, relation_id: Optional[int] = None) -> bool:
563 """Check if the resource has been created.
564
565 This function can be used to check if the Provider answered with data in the charm code
566 when outside an event callback.
567
568 Args:
569 relation_id (int, optional): When provided the check is done only for the relation id
570 provided, otherwise the check is done for all relations
571
572 Returns:
573 True or False
574
575 Raises:
576 IndexError: If relation_id is provided but that relation does not exist
577 """
578 if relation_id is not None:
579 try:
580 relation = [relation for relation in self.relations if relation.id == relation_id][
581 0
582 ]
583 return self._is_resource_created_for_relation(relation)
584 except IndexError:
585 raise IndexError(f"relation id {relation_id} cannot be accessed")
586 else:
587 return (
588 all(
589 [
590 self._is_resource_created_for_relation(relation)
591 for relation in self.relations
592 ]
593 )
594 if self.relations
595 else False
596 )
597
598
599 # General events
600
601
602 class ExtraRoleEvent(RelationEvent):
603 """Base class for data events."""
604
605 @property
606 def extra_user_roles(self) -> Optional[str]:
607 """Returns the extra user roles that were requested."""
608 return self.relation.data[self.relation.app].get("extra-user-roles")
609
610
611 class AuthenticationEvent(RelationEvent):
612 """Base class for authentication fields for events."""
613
614 @property
615 def username(self) -> Optional[str]:
616 """Returns the created username."""
617 return self.relation.data[self.relation.app].get("username")
618
619 @property
620 def password(self) -> Optional[str]:
621 """Returns the password for the created user."""
622 return self.relation.data[self.relation.app].get("password")
623
624 @property
625 def tls(self) -> Optional[str]:
626 """Returns whether TLS is configured."""
627 return self.relation.data[self.relation.app].get("tls")
628
629 @property
630 def tls_ca(self) -> Optional[str]:
631 """Returns TLS CA."""
632 return self.relation.data[self.relation.app].get("tls-ca")
633
634
635 # Database related events and fields
636
637
638 class DatabaseProvidesEvent(RelationEvent):
639 """Base class for database events."""
640
641 @property
642 def database(self) -> Optional[str]:
643 """Returns the database that was requested."""
644 return self.relation.data[self.relation.app].get("database")
645
646
647 class DatabaseRequestedEvent(DatabaseProvidesEvent, ExtraRoleEvent):
648 """Event emitted when a new database is requested for use on this relation."""
649
650
651 class DatabaseProvidesEvents(CharmEvents):
652 """Database events.
653
654 This class defines the events that the database can emit.
655 """
656
657 database_requested = EventSource(DatabaseRequestedEvent)
658
659
660 class DatabaseRequiresEvent(RelationEvent):
661 """Base class for database events."""
662
663 @property
664 def endpoints(self) -> Optional[str]:
665 """Returns a comma separated list of read/write endpoints."""
666 return self.relation.data[self.relation.app].get("endpoints")
667
668 @property
669 def read_only_endpoints(self) -> Optional[str]:
670 """Returns a comma separated list of read only endpoints."""
671 return self.relation.data[self.relation.app].get("read-only-endpoints")
672
673 @property
674 def replset(self) -> Optional[str]:
675 """Returns the replicaset name.
676
677 MongoDB only.
678 """
679 return self.relation.data[self.relation.app].get("replset")
680
681 @property
682 def uris(self) -> Optional[str]:
683 """Returns the connection URIs.
684
685 MongoDB, Redis, OpenSearch.
686 """
687 return self.relation.data[self.relation.app].get("uris")
688
689 @property
690 def version(self) -> Optional[str]:
691 """Returns the version of the database.
692
693 Version as informed by the database daemon.
694 """
695 return self.relation.data[self.relation.app].get("version")
696
697
698 class DatabaseCreatedEvent(AuthenticationEvent, DatabaseRequiresEvent):
699 """Event emitted when a new database is created for use on this relation."""
700
701
702 class DatabaseEndpointsChangedEvent(AuthenticationEvent, DatabaseRequiresEvent):
703 """Event emitted when the read/write endpoints are changed."""
704
705
706 class DatabaseReadOnlyEndpointsChangedEvent(AuthenticationEvent, DatabaseRequiresEvent):
707 """Event emitted when the read only endpoints are changed."""
708
709
710 class DatabaseRequiresEvents(CharmEvents):
711 """Database events.
712
713 This class defines the events that the database can emit.
714 """
715
716 database_created = EventSource(DatabaseCreatedEvent)
717 endpoints_changed = EventSource(DatabaseEndpointsChangedEvent)
718 read_only_endpoints_changed = EventSource(DatabaseReadOnlyEndpointsChangedEvent)
719
720
721 # Database Provider and Requires
722
723
724 class DatabaseProvides(DataProvides):
725 """Provider-side of the database relations."""
726
727 on = DatabaseProvidesEvents()
728
729 def __init__(self, charm: CharmBase, relation_name: str) -> None:
730 super().__init__(charm, relation_name)
731
732 def _on_relation_changed(self, event: RelationChangedEvent) -> None:
733 """Event emitted when the relation has changed."""
734 # Only the leader should handle this event.
735 if not self.local_unit.is_leader():
736 return
737
738 # Check which data has changed to emit customs events.
739 diff = self._diff(event)
740
741 # Emit a database requested event if the setup key (database name and optional
742 # extra user roles) was added to the relation databag by the application.
743 if "database" in diff.added:
744 self.on.database_requested.emit(event.relation, app=event.app, unit=event.unit)
745
746 def set_endpoints(self, relation_id: int, connection_strings: str) -> None:
747 """Set database primary connections.
748
749 This function writes in the application data bag, therefore,
750 only the leader unit can call it.
751
752 Args:
753 relation_id: the identifier for a particular relation.
754 connection_strings: database hosts and ports comma separated list.
755 """
756 self._update_relation_data(relation_id, {"endpoints": connection_strings})
757
758 def set_read_only_endpoints(self, relation_id: int, connection_strings: str) -> None:
759 """Set database replicas connection strings.
760
761 This function writes in the application data bag, therefore,
762 only the leader unit can call it.
763
764 Args:
765 relation_id: the identifier for a particular relation.
766 connection_strings: database hosts and ports comma separated list.
767 """
768 self._update_relation_data(relation_id, {"read-only-endpoints": connection_strings})
769
770 def set_replset(self, relation_id: int, replset: str) -> None:
771 """Set replica set name in the application relation databag.
772
773 MongoDB only.
774
775 Args:
776 relation_id: the identifier for a particular relation.
777 replset: replica set name.
778 """
779 self._update_relation_data(relation_id, {"replset": replset})
780
781 def set_uris(self, relation_id: int, uris: str) -> None:
782 """Set the database connection URIs in the application relation databag.
783
784 MongoDB, Redis, and OpenSearch only.
785
786 Args:
787 relation_id: the identifier for a particular relation.
788 uris: connection URIs.
789 """
790 self._update_relation_data(relation_id, {"uris": uris})
791
792 def set_version(self, relation_id: int, version: str) -> None:
793 """Set the database version in the application relation databag.
794
795 Args:
796 relation_id: the identifier for a particular relation.
797 version: database version.
798 """
799 self._update_relation_data(relation_id, {"version": version})
800
801
802 class DatabaseRequires(DataRequires):
803 """Requires-side of the database relation."""
804
805 on = DatabaseRequiresEvents()
806
807 def __init__(
808 self,
809 charm,
810 relation_name: str,
811 database_name: str,
812 extra_user_roles: str = None,
813 relations_aliases: List[str] = None,
814 ):
815 """Manager of database client relations."""
816 super().__init__(charm, relation_name, extra_user_roles)
817 self.database = database_name
818 self.relations_aliases = relations_aliases
819
820 # Define custom event names for each alias.
821 if relations_aliases:
822 # Ensure the number of aliases does not exceed the maximum
823 # of connections allowed in the specific relation.
824 relation_connection_limit = self.charm.meta.requires[relation_name].limit
825 if len(relations_aliases) != relation_connection_limit:
826 raise ValueError(
827 f"The number of aliases must match the maximum number of connections allowed in the relation. "
828 f"Expected {relation_connection_limit}, got {len(relations_aliases)}"
829 )
830
831 for relation_alias in relations_aliases:
832 self.on.define_event(f"{relation_alias}_database_created", DatabaseCreatedEvent)
833 self.on.define_event(
834 f"{relation_alias}_endpoints_changed", DatabaseEndpointsChangedEvent
835 )
836 self.on.define_event(
837 f"{relation_alias}_read_only_endpoints_changed",
838 DatabaseReadOnlyEndpointsChangedEvent,
839 )
840
841 def _assign_relation_alias(self, relation_id: int) -> None:
842 """Assigns an alias to a relation.
843
844 This function writes in the unit data bag.
845
846 Args:
847 relation_id: the identifier for a particular relation.
848 """
849 # If no aliases were provided, return immediately.
850 if not self.relations_aliases:
851 return
852
853 # Return if an alias was already assigned to this relation
854 # (like when there are more than one unit joining the relation).
855 if (
856 self.charm.model.get_relation(self.relation_name, relation_id)
857 .data[self.local_unit]
858 .get("alias")
859 ):
860 return
861
862 # Retrieve the available aliases (the ones that weren't assigned to any relation).
863 available_aliases = self.relations_aliases[:]
864 for relation in self.charm.model.relations[self.relation_name]:
865 alias = relation.data[self.local_unit].get("alias")
866 if alias:
867 logger.debug("Alias %s was already assigned to relation %d", alias, relation.id)
868 available_aliases.remove(alias)
869
870 # Set the alias in the unit relation databag of the specific relation.
871 relation = self.charm.model.get_relation(self.relation_name, relation_id)
872 relation.data[self.local_unit].update({"alias": available_aliases[0]})
873
874 def _emit_aliased_event(self, event: RelationChangedEvent, event_name: str) -> None:
875 """Emit an aliased event to a particular relation if it has an alias.
876
877 Args:
878 event: the relation changed event that was received.
879 event_name: the name of the event to emit.
880 """
881 alias = self._get_relation_alias(event.relation.id)
882 if alias:
883 getattr(self.on, f"{alias}_{event_name}").emit(
884 event.relation, app=event.app, unit=event.unit
885 )
886
887 def _get_relation_alias(self, relation_id: int) -> Optional[str]:
888 """Returns the relation alias.
889
890 Args:
891 relation_id: the identifier for a particular relation.
892
893 Returns:
894 the relation alias or None if the relation was not found.
895 """
896 for relation in self.charm.model.relations[self.relation_name]:
897 if relation.id == relation_id:
898 return relation.data[self.local_unit].get("alias")
899 return None
900
901 def _on_relation_joined_event(self, event: RelationJoinedEvent) -> None:
902 """Event emitted when the application joins the database relation."""
903 # If relations aliases were provided, assign one to the relation.
904 self._assign_relation_alias(event.relation.id)
905
906 # Sets both database and extra user roles in the relation
907 # if the roles are provided. Otherwise, sets only the database.
908 if self.extra_user_roles:
909 self._update_relation_data(
910 event.relation.id,
911 {
912 "database": self.database,
913 "extra-user-roles": self.extra_user_roles,
914 },
915 )
916 else:
917 self._update_relation_data(event.relation.id, {"database": self.database})
918
919 def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
920 """Event emitted when the database relation has changed."""
921 # Check which data has changed to emit customs events.
922 diff = self._diff(event)
923
924 # Check if the database is created
925 # (the database charm shared the credentials).
926 if "username" in diff.added and "password" in diff.added:
927 # Emit the default event (the one without an alias).
928 logger.info("database created at %s", datetime.now())
929 self.on.database_created.emit(event.relation, app=event.app, unit=event.unit)
930
931 # Emit the aliased event (if any).
932 self._emit_aliased_event(event, "database_created")
933
934 # To avoid unnecessary application restarts do not trigger
935 # “endpoints_changed“ event if “database_created“ is triggered.
936 return
937
938 # Emit an endpoints changed event if the database
939 # added or changed this info in the relation databag.
940 if "endpoints" in diff.added or "endpoints" in diff.changed:
941 # Emit the default event (the one without an alias).
942 logger.info("endpoints changed on %s", datetime.now())
943 self.on.endpoints_changed.emit(event.relation, app=event.app, unit=event.unit)
944
945 # Emit the aliased event (if any).
946 self._emit_aliased_event(event, "endpoints_changed")
947
948 # To avoid unnecessary application restarts do not trigger
949 # “read_only_endpoints_changed“ event if “endpoints_changed“ is triggered.
950 return
951
952 # Emit a read only endpoints changed event if the database
953 # added or changed this info in the relation databag.
954 if "read-only-endpoints" in diff.added or "read-only-endpoints" in diff.changed:
955 # Emit the default event (the one without an alias).
956 logger.info("read-only-endpoints changed on %s", datetime.now())
957 self.on.read_only_endpoints_changed.emit(
958 event.relation, app=event.app, unit=event.unit
959 )
960
961 # Emit the aliased event (if any).
962 self._emit_aliased_event(event, "read_only_endpoints_changed")
963
964
965 # Kafka related events
966
967
968 class KafkaProvidesEvent(RelationEvent):
969 """Base class for Kafka events."""
970
971 @property
972 def topic(self) -> Optional[str]:
973 """Returns the topic that was requested."""
974 return self.relation.data[self.relation.app].get("topic")
975
976
977 class TopicRequestedEvent(KafkaProvidesEvent, ExtraRoleEvent):
978 """Event emitted when a new topic is requested for use on this relation."""
979
980
981 class KafkaProvidesEvents(CharmEvents):
982 """Kafka events.
983
984 This class defines the events that the Kafka can emit.
985 """
986
987 topic_requested = EventSource(TopicRequestedEvent)
988
989
990 class KafkaRequiresEvent(RelationEvent):
991 """Base class for Kafka events."""
992
993 @property
994 def bootstrap_server(self) -> Optional[str]:
995 """Returns a a comma-seperated list of broker uris."""
996 return self.relation.data[self.relation.app].get("endpoints")
997
998 @property
999 def consumer_group_prefix(self) -> Optional[str]:
1000 """Returns the consumer-group-prefix."""
1001 return self.relation.data[self.relation.app].get("consumer-group-prefix")
1002
1003 @property
1004 def zookeeper_uris(self) -> Optional[str]:
1005 """Returns a comma separated list of Zookeeper uris."""
1006 return self.relation.data[self.relation.app].get("zookeeper-uris")
1007
1008
1009 class TopicCreatedEvent(AuthenticationEvent, KafkaRequiresEvent):
1010 """Event emitted when a new topic is created for use on this relation."""
1011
1012
1013 class BootstrapServerChangedEvent(AuthenticationEvent, KafkaRequiresEvent):
1014 """Event emitted when the bootstrap server is changed."""
1015
1016
1017 class KafkaRequiresEvents(CharmEvents):
1018 """Kafka events.
1019
1020 This class defines the events that the Kafka can emit.
1021 """
1022
1023 topic_created = EventSource(TopicCreatedEvent)
1024 bootstrap_server_changed = EventSource(BootstrapServerChangedEvent)
1025
1026
1027 # Kafka Provides and Requires
1028
1029
1030 class KafkaProvides(DataProvides):
1031 """Provider-side of the Kafka relation."""
1032
1033 on = KafkaProvidesEvents()
1034
1035 def __init__(self, charm: CharmBase, relation_name: str) -> None:
1036 super().__init__(charm, relation_name)
1037
1038 def _on_relation_changed(self, event: RelationChangedEvent) -> None:
1039 """Event emitted when the relation has changed."""
1040 # Only the leader should handle this event.
1041 if not self.local_unit.is_leader():
1042 return
1043
1044 # Check which data has changed to emit customs events.
1045 diff = self._diff(event)
1046
1047 # Emit a topic requested event if the setup key (topic name and optional
1048 # extra user roles) was added to the relation databag by the application.
1049 if "topic" in diff.added:
1050 self.on.topic_requested.emit(event.relation, app=event.app, unit=event.unit)
1051
1052 def set_bootstrap_server(self, relation_id: int, bootstrap_server: str) -> None:
1053 """Set the bootstrap server in the application relation databag.
1054
1055 Args:
1056 relation_id: the identifier for a particular relation.
1057 bootstrap_server: the bootstrap server address.
1058 """
1059 self._update_relation_data(relation_id, {"endpoints": bootstrap_server})
1060
1061 def set_consumer_group_prefix(self, relation_id: int, consumer_group_prefix: str) -> None:
1062 """Set the consumer group prefix in the application relation databag.
1063
1064 Args:
1065 relation_id: the identifier for a particular relation.
1066 consumer_group_prefix: the consumer group prefix string.
1067 """
1068 self._update_relation_data(relation_id, {"consumer-group-prefix": consumer_group_prefix})
1069
1070 def set_zookeeper_uris(self, relation_id: int, zookeeper_uris: str) -> None:
1071 """Set the zookeeper uris in the application relation databag.
1072
1073 Args:
1074 relation_id: the identifier for a particular relation.
1075 zookeeper_uris: comma-seperated list of ZooKeeper server uris.
1076 """
1077 self._update_relation_data(relation_id, {"zookeeper-uris": zookeeper_uris})
1078
1079
1080 class KafkaRequires(DataRequires):
1081 """Requires-side of the Kafka relation."""
1082
1083 on = KafkaRequiresEvents()
1084
1085 def __init__(self, charm, relation_name: str, topic: str, extra_user_roles: str = None):
1086 """Manager of Kafka client relations."""
1087 # super().__init__(charm, relation_name)
1088 super().__init__(charm, relation_name, extra_user_roles)
1089 self.charm = charm
1090 self.topic = topic
1091
1092 def _on_relation_joined_event(self, event: RelationJoinedEvent) -> None:
1093 """Event emitted when the application joins the Kafka relation."""
1094 # Sets both topic and extra user roles in the relation
1095 # if the roles are provided. Otherwise, sets only the topic.
1096 self._update_relation_data(
1097 event.relation.id,
1098 {
1099 "topic": self.topic,
1100 "extra-user-roles": self.extra_user_roles,
1101 }
1102 if self.extra_user_roles is not None
1103 else {"topic": self.topic},
1104 )
1105
1106 def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
1107 """Event emitted when the Kafka relation has changed."""
1108 # Check which data has changed to emit customs events.
1109 diff = self._diff(event)
1110
1111 # Check if the topic is created
1112 # (the Kafka charm shared the credentials).
1113 if "username" in diff.added and "password" in diff.added:
1114 # Emit the default event (the one without an alias).
1115 logger.info("topic created at %s", datetime.now())
1116 self.on.topic_created.emit(event.relation, app=event.app, unit=event.unit)
1117
1118 # To avoid unnecessary application restarts do not trigger
1119 # “endpoints_changed“ event if “topic_created“ is triggered.
1120 return
1121
1122 # Emit an endpoints (bootstap-server) changed event if the Kakfa endpoints
1123 # added or changed this info in the relation databag.
1124 if "endpoints" in diff.added or "endpoints" in diff.changed:
1125 # Emit the default event (the one without an alias).
1126 logger.info("endpoints changed on %s", datetime.now())
1127 self.on.bootstrap_server_changed.emit(
1128 event.relation, app=event.app, unit=event.unit
1129 ) # here check if this is the right design
1130 return