1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 from n2vc
.config
import EnvironConfig
24 from n2vc
.exceptions
import K8sException
25 from n2vc
.k8s_conn
import K8sConnector
26 from n2vc
.kubectl
import Kubectl
, CORE_CLIENT
, RBAC_CLIENT
27 from .exceptions
import MethodNotImplemented
28 from n2vc
.libjuju
import Libjuju
29 from n2vc
.utils
import obj_to_dict
, obj_to_yaml
30 from n2vc
.store
import MotorStore
31 from n2vc
.vca
.cloud
import Cloud
32 from n2vc
.vca
.connection
import get_connection
33 from kubernetes
.client
.models
import (
43 from typing
import Dict
45 SERVICE_ACCOUNT_TOKEN_KEY
= "token"
46 SERVICE_ACCOUNT_ROOT_CA_KEY
= "ca.crt"
47 RBAC_LABEL_KEY_NAME
= "rbac-id"
49 ADMIN_NAMESPACE
= "kube-system"
50 RBAC_STACK_PREFIX
= "juju-credential"
53 def generate_rbac_id():
54 return binascii
.hexlify(os
.urandom(4)).decode()
57 class K8sJujuConnector(K8sConnector
):
64 kubectl_command
: str = "/usr/bin/kubectl",
65 juju_command
: str = "/usr/bin/juju",
71 :param fs: file system for kubernetes and helm configuration
72 :param db: Database object
73 :param kubectl_command: path to kubectl executable
74 :param helm_command: path to helm executable
76 :param: loop: Asyncio loop
80 K8sConnector
.__init
__(
84 on_update_db
=on_update_db
,
88 self
.loop
= loop
or asyncio
.get_event_loop()
89 self
.log
.debug("Initializing K8S Juju connector")
91 db_uri
= EnvironConfig(prefixes
=["OSMLCM_", "OSMMON_"]).get("database_uri")
92 self
._store
= MotorStore(db_uri
)
93 self
.loading_libjuju
= asyncio
.Lock(loop
=self
.loop
)
95 self
.log
.debug("K8S Juju connector initialized")
96 # TODO: Remove these commented lines:
97 # self.authenticated = False
99 # self.juju_secret = ""
106 namespace
: str = "kube-system",
107 reuse_cluster_uuid
: str = None,
111 It prepares a given K8s cluster environment to run Juju bundles.
113 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
115 :param namespace: optional namespace to be used for juju. By default,
116 'kube-system' will be used
117 :param reuse_cluster_uuid: existing cluster uuid for reuse
118 :param: kwargs: Additional parameters
121 :return: uuid of the K8s cluster and True if connector has installed some
122 software in the cluster
123 (on error, an exception will be raised)
125 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
127 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
129 kubecfg
= tempfile
.NamedTemporaryFile()
130 with
open(kubecfg
.name
, "w") as kubecfg_file
:
131 kubecfg_file
.write(k8s_creds
)
132 kubectl
= Kubectl(config_file
=kubecfg
.name
)
134 # CREATING RESOURCES IN K8S
135 rbac_id
= generate_rbac_id()
136 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
137 labels
= {RBAC_STACK_PREFIX
: rbac_id
}
139 # Create cleanup dictionary to clean up created resources
140 # if it fails in the middle of the process
143 self
._create
_cluster
_role
(
150 "delete": self
._delete
_cluster
_role
,
151 "args": (kubectl
, metadata_name
),
155 self
._create
_service
_account
(
162 "delete": self
._delete
_service
_account
,
163 "args": (kubectl
, metadata_name
),
167 self
._create
_cluster
_role
_binding
(
174 "delete": self
._delete
_service
_account
,
175 "args": (kubectl
, metadata_name
),
178 token
, client_cert_data
= await self
._get
_secret
_data
(
183 default_storage_class
= kubectl
.get_default_storage_class()
184 await libjuju
.add_k8s(
188 client_cert_data
=client_cert_data
,
189 configuration
=kubectl
.configuration
,
190 storage_class
=default_storage_class
,
191 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
193 return cluster_uuid
, True
194 except Exception as e
:
195 self
.log
.error("Error initializing k8scluster: {}".format(e
))
196 if len(cleanup_data
) > 0:
197 self
.log
.debug("Cleaning up created resources in k8s cluster...")
198 for item
in cleanup_data
:
199 delete_function
= item
["delete"]
200 delete_args
= item
["args"]
201 delete_function(*delete_args
)
202 self
.log
.debug("Cleanup finished")
205 """Repo Management"""
211 _type
: str = "charm",
213 raise MethodNotImplemented()
215 async def repo_list(self
):
216 raise MethodNotImplemented()
218 async def repo_remove(
222 raise MethodNotImplemented()
224 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
226 Returns None as currently add_repo is not implemented
236 uninstall_sw
: bool = False,
241 Resets the Kubernetes cluster by removing the model that represents it.
243 :param cluster_uuid str: The UUID of the cluster to reset
244 :param force: Force reset
245 :param uninstall_sw: Boolean to uninstall sw
246 :param: kwargs: Additional parameters
249 :return: Returns True if successful or raises an exception.
253 self
.log
.debug("[reset] Removing k8s cloud")
254 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
256 cloud
= Cloud(cluster_uuid
, self
._get
_credential
_name
(cluster_uuid
))
258 cloud_creds
= await libjuju
.get_cloud_credentials(cloud
)
260 await libjuju
.remove_cloud(cluster_uuid
)
262 kubecfg
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
264 kubecfg_file
= tempfile
.NamedTemporaryFile()
265 with
open(kubecfg_file
.name
, "w") as f
:
267 kubectl
= Kubectl(config_file
=kubecfg_file
.name
)
270 self
._delete
_cluster
_role
_binding
,
271 self
._delete
_service
_account
,
272 self
._delete
_cluster
_role
,
275 credential_attrs
= cloud_creds
[0].result
["attrs"]
276 if RBAC_LABEL_KEY_NAME
in credential_attrs
:
277 rbac_id
= credential_attrs
[RBAC_LABEL_KEY_NAME
]
278 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
279 delete_args
= (kubectl
, metadata_name
)
280 for delete_func
in delete_functions
:
282 delete_func(*delete_args
)
283 except Exception as e
:
284 self
.log
.warning("Cannot remove resource in K8s {}".format(e
))
286 except Exception as e
:
287 self
.log
.debug("Caught exception during reset: {}".format(e
))
299 timeout
: float = 1800,
301 db_dict
: dict = None,
302 kdu_name
: str = None,
303 namespace
: str = None,
308 :param cluster_uuid str: The UUID of the cluster to install to
309 :param kdu_model str: The name or path of a bundle to install
310 :param kdu_instance: Kdu instance name
311 :param atomic bool: If set, waits until the model is active and resets
312 the cluster on failure.
313 :param timeout int: The time, in seconds, to wait for the install
315 :param params dict: Key-value pairs of instantiation parameters
316 :param kdu_name: Name of the KDU instance to be installed
317 :param namespace: K8s namespace to use for the KDU instance
318 :param kwargs: Additional parameters
321 :return: If successful, returns ?
323 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
327 raise K8sException("db_dict must be set")
329 raise K8sException("bundle must be set")
331 if bundle
.startswith("cs:"):
333 elif bundle
.startswith("http"):
337 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
338 os
.chdir(new_workdir
)
339 bundle
= "local:{}".format(kdu_model
)
341 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
343 # Create the new model
344 self
.log
.debug("Adding model: {}".format(kdu_instance
))
345 cloud
= Cloud(cluster_uuid
, self
._get
_credential
_name
(cluster_uuid
))
346 await libjuju
.add_model(kdu_instance
, cloud
)
349 # TODO: Instantiation parameters
352 "Juju bundle that models the KDU, in any of the following ways:
353 - <juju-repo>/<juju-bundle>
354 - <juju-bundle folder under k8s_models folder in the package>
355 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
357 - <URL_where_to_fetch_juju_bundle>
360 previous_workdir
= os
.getcwd()
361 except FileNotFoundError
:
362 previous_workdir
= "/app/storage"
364 self
.log
.debug("[install] deploying {}".format(bundle
))
365 await libjuju
.deploy(
366 bundle
, model_name
=kdu_instance
, wait
=atomic
, timeout
=timeout
368 os
.chdir(previous_workdir
)
369 if self
.on_update_db
:
370 await self
.on_update_db(
373 filter=db_dict
["filter"],
374 vca_id
=kwargs
.get("vca_id"),
383 total_timeout
: float = 1800,
386 """Scale an application in a model
388 :param: kdu_instance str: KDU instance name
389 :param: scale int: Scale to which to set this application
390 :param: resource_name str: Resource name (Application name)
391 :param: timeout float: The time, in seconds, to wait for the install
393 :param kwargs: Additional parameters
396 :return: If successful, returns True
400 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
401 await libjuju
.scale_application(
402 model_name
=kdu_instance
,
403 application_name
=resource_name
,
405 total_timeout
=total_timeout
,
407 except Exception as e
:
408 error_msg
= "Error scaling application {} in kdu instance {}: {}".format(
409 resource_name
, kdu_instance
, e
411 self
.log
.error(error_msg
)
412 raise K8sException(message
=error_msg
)
415 async def get_scale_count(
421 """Get an application scale count
423 :param: resource_name str: Resource name (Application name)
424 :param: kdu_instance str: KDU instance name
425 :param kwargs: Additional parameters
427 :return: Return application instance count
430 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
431 status
= await libjuju
.get_model_status(kdu_instance
)
432 return len(status
.applications
[resource_name
].units
)
433 except Exception as e
:
434 error_msg
= "Error getting scale count from application {} in kdu instance {}: {}".format(
435 resource_name
, kdu_instance
, e
437 self
.log
.error(error_msg
)
438 raise K8sException(message
=error_msg
)
440 async def instances_list(self
, cluster_uuid
: str) -> list:
442 returns a list of deployed releases in a cluster
444 :param cluster_uuid: the cluster
453 kdu_model
: str = None,
458 :param cluster_uuid str: The UUID of the cluster to upgrade
459 :param kdu_instance str: The unique name of the KDU instance
460 :param kdu_model str: The name or path of the bundle to upgrade to
461 :param params dict: Key-value pairs of instantiation parameters
463 :return: If successful, reference to the new revision number of the
467 # TODO: Loop through the bundle and upgrade each charm individually
470 The API doesn't have a concept of bundle upgrades, because there are
471 many possible changes: charm revision, disk, number of units, etc.
473 As such, we are only supporting a limited subset of upgrades. We'll
474 upgrade the charm revision but leave storage and scale untouched.
476 Scale changes should happen through OSM constructs, and changes to
477 storage would require a redeployment of the service, at least in this
480 raise MethodNotImplemented()
492 :param cluster_uuid str: The UUID of the cluster to rollback
493 :param kdu_instance str: The unique name of the KDU instance
494 :param revision int: The revision to revert to. If omitted, rolls back
495 the previous upgrade.
497 :return: If successful, returns the revision of active KDU instance,
498 or raises an exception
500 raise MethodNotImplemented()
510 """Uninstall a KDU instance
512 :param cluster_uuid str: The UUID of the cluster
513 :param kdu_instance str: The unique name of the KDU instance
514 :param kwargs: Additional parameters
517 :return: Returns True if successful, or raises an exception
520 self
.log
.debug("[uninstall] Destroying model")
521 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
523 await libjuju
.destroy_model(kdu_instance
, total_timeout
=3600)
525 # self.log.debug("[uninstall] Model destroyed and disconnecting")
526 # await controller.disconnect()
529 # TODO: Remove these commented lines
530 # if not self.authenticated:
531 # self.log.debug("[uninstall] Connecting to controller")
532 # await self.login(cluster_uuid)
534 async def exec_primitive(
536 cluster_uuid
: str = None,
537 kdu_instance
: str = None,
538 primitive_name
: str = None,
539 timeout
: float = 300,
541 db_dict
: dict = None,
544 """Exec primitive (Juju action)
546 :param cluster_uuid str: The UUID of the cluster
547 :param kdu_instance str: The unique name of the KDU instance
548 :param primitive_name: Name of action that will be executed
549 :param timeout: Timeout for action execution
550 :param params: Dictionary of all the parameters needed for the action
551 :param db_dict: Dictionary for any additional data
552 :param kwargs: Additional parameters
555 :return: Returns the output of the action
557 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
559 if not params
or "application-name" not in params
:
561 "Missing application-name argument, \
562 argument needed for K8s actions"
566 "[exec_primitive] Getting model "
567 "kdu_instance: {}".format(kdu_instance
)
569 application_name
= params
["application-name"]
570 actions
= await libjuju
.get_actions(application_name
, kdu_instance
)
571 if primitive_name
not in actions
:
572 raise K8sException("Primitive {} not found".format(primitive_name
))
573 output
, status
= await libjuju
.execute_action(
574 application_name
, kdu_instance
, primitive_name
, **params
577 if status
!= "completed":
579 "status is not completed: {} output: {}".format(status
, output
)
581 if self
.on_update_db
:
582 await self
.on_update_db(
583 cluster_uuid
, kdu_instance
, filter=db_dict
["filter"]
588 except Exception as e
:
589 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
590 self
.log
.error(error_msg
)
591 raise K8sException(message
=error_msg
)
595 async def inspect_kdu(
601 Inspects a bundle and returns a dictionary of config parameters and
602 their default values.
604 :param kdu_model str: The name or path of the bundle to inspect.
606 :return: If successful, returns a dictionary of available parameters
607 and their default values.
611 if not os
.path
.exists(kdu_model
):
612 raise K8sException("file {} not found".format(kdu_model
))
614 with
open(kdu_model
, "r") as f
:
615 bundle
= yaml
.safe_load(f
.read())
619 'description': 'Test bundle',
620 'bundle': 'kubernetes',
623 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
626 'password': 'manopw',
627 'root_password': 'osm4u',
630 'series': 'kubernetes'
635 # TODO: This should be returned in an agreed-upon format
636 kdu
= bundle
["applications"]
646 If available, returns the README of the bundle.
648 :param kdu_model str: The name or path of a bundle
650 :return: If found, returns the contents of the README.
654 files
= ["README", "README.txt", "README.md"]
655 path
= os
.path
.dirname(kdu_model
)
656 for file in os
.listdir(path
):
658 with
open(file, "r") as f
:
664 async def status_kdu(
668 complete_status
: bool = False,
669 yaml_format
: bool = False,
672 """Get the status of the KDU
674 Get the current status of the KDU instance.
676 :param cluster_uuid str: The UUID of the cluster
677 :param kdu_instance str: The unique id of the KDU instance
678 :param complete_status: To get the complete_status of the KDU
679 :param yaml_format: To get the status in proper format for NSR record
680 :param: kwargs: Additional parameters
683 :return: Returns a dictionary containing namespace, state, resources,
684 and deployment_time and returns complete_status if complete_status is True
686 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
689 model_status
= await libjuju
.get_model_status(kdu_instance
)
691 if not complete_status
:
692 for name
in model_status
.applications
:
693 application
= model_status
.applications
[name
]
694 status
[name
] = {"status": application
["status"]["status"]}
697 return obj_to_yaml(model_status
)
699 return obj_to_dict(model_status
)
703 async def update_vca_status(self
, vcastatus
: dict, kdu_instance
: str, **kwargs
):
705 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
707 :param vcastatus dict: dict containing vcastatus
708 :param kdu_instance str: The unique id of the KDU instance
709 :param: kwargs: Additional parameters
714 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
716 for model_name
in vcastatus
:
717 # Adding executed actions
718 vcastatus
[model_name
][
720 ] = await libjuju
.get_executed_actions(kdu_instance
)
722 for application
in vcastatus
[model_name
]["applications"]:
723 # Adding application actions
724 vcastatus
[model_name
]["applications"][application
][
726 ] = await libjuju
.get_actions(application
, kdu_instance
)
727 # Adding application configs
728 vcastatus
[model_name
]["applications"][application
][
730 ] = await libjuju
.get_application_configs(kdu_instance
, application
)
732 except Exception as e
:
733 self
.log
.debug("Error in updating vca status: {}".format(str(e
)))
735 async def get_services(
736 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
738 """Return a list of services of a kdu_instance"""
740 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
742 kubecfg
= tempfile
.NamedTemporaryFile()
743 with
open(kubecfg
.name
, "w") as kubecfg_file
:
744 kubecfg_file
.write(credentials
)
745 kubectl
= Kubectl(config_file
=kubecfg
.name
)
747 return kubectl
.get_services(
748 field_selector
="metadata.namespace={}".format(kdu_instance
)
751 async def get_service(
752 self
, cluster_uuid
: str, service_name
: str, namespace
: str
754 """Return data for a specific service inside a namespace"""
756 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
758 kubecfg
= tempfile
.NamedTemporaryFile()
759 with
open(kubecfg
.name
, "w") as kubecfg_file
:
760 kubecfg_file
.write(credentials
)
761 kubectl
= Kubectl(config_file
=kubecfg
.name
)
763 return kubectl
.get_services(
764 field_selector
="metadata.name={},metadata.namespace={}".format(
765 service_name
, namespace
769 def get_credentials(self
, cluster_uuid
: str) -> str:
771 Get Cluster Kubeconfig
773 k8scluster
= self
.db
.get_one(
774 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
777 self
.db
.encrypt_decrypt_fields(
778 k8scluster
.get("credentials"),
780 ["password", "secret"],
781 schema_version
=k8scluster
["schema_version"],
782 salt
=k8scluster
["_id"],
785 return yaml
.safe_dump(k8scluster
.get("credentials"))
787 def _get_credential_name(self
, cluster_uuid
: str) -> str:
789 Get credential name for a k8s cloud
791 We cannot use the cluster_uuid for the credential name directly,
792 because it cannot start with a number, it must start with a letter.
793 Therefore, the k8s cloud credential name will be "cred-" followed
796 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
798 :return: Name to use for the credential name.
800 return "cred-{}".format(cluster_uuid
)
806 """Get the namespace UUID
807 Gets the namespace's unique name
809 :param cluster_uuid str: The UUID of the cluster
810 :returns: The namespace UUID, or raises an exception
814 def _create_cluster_role(
818 labels
: Dict
[str, str],
820 cluster_roles
= kubectl
.clients
[RBAC_CLIENT
].list_cluster_role(
821 field_selector
="metadata.name={}".format(name
)
824 if len(cluster_roles
.items
) > 0:
826 "Cluster role with metadata.name={} already exists".format(name
)
829 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=ADMIN_NAMESPACE
)
831 cluster_role
= V1ClusterRole(
834 V1PolicyRule(api_groups
=["*"], resources
=["*"], verbs
=["*"]),
835 V1PolicyRule(non_resource_ur_ls
=["*"], verbs
=["*"]),
839 kubectl
.clients
[RBAC_CLIENT
].create_cluster_role(cluster_role
)
841 def _delete_cluster_role(self
, kubectl
: Kubectl
, name
: str):
842 kubectl
.clients
[RBAC_CLIENT
].delete_cluster_role(name
)
844 def _create_service_account(
848 labels
: Dict
[str, str],
850 service_accounts
= kubectl
.clients
[CORE_CLIENT
].list_namespaced_service_account(
851 ADMIN_NAMESPACE
, field_selector
="metadata.name={}".format(name
)
853 if len(service_accounts
.items
) > 0:
855 "Service account with metadata.name={} already exists".format(name
)
858 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=ADMIN_NAMESPACE
)
859 service_account
= V1ServiceAccount(metadata
=metadata
)
861 kubectl
.clients
[CORE_CLIENT
].create_namespaced_service_account(
862 ADMIN_NAMESPACE
, service_account
865 def _delete_service_account(self
, kubectl
: Kubectl
, name
: str):
866 kubectl
.clients
[CORE_CLIENT
].delete_namespaced_service_account(
867 name
, ADMIN_NAMESPACE
870 def _create_cluster_role_binding(
874 labels
: Dict
[str, str],
876 role_bindings
= kubectl
.clients
[RBAC_CLIENT
].list_cluster_role_binding(
877 field_selector
="metadata.name={}".format(name
)
879 if len(role_bindings
.items
) > 0:
880 raise Exception("Generated rbac id already exists")
882 role_binding
= V1ClusterRoleBinding(
883 metadata
=V1ObjectMeta(name
=name
, labels
=labels
),
884 role_ref
=V1RoleRef(kind
="ClusterRole", name
=name
, api_group
=""),
886 V1Subject(kind
="ServiceAccount", name
=name
, namespace
=ADMIN_NAMESPACE
)
889 kubectl
.clients
[RBAC_CLIENT
].create_cluster_role_binding(role_binding
)
891 def _delete_cluster_role_binding(self
, kubectl
: Kubectl
, name
: str):
892 kubectl
.clients
[RBAC_CLIENT
].delete_cluster_role_binding(name
)
894 async def _get_secret_data(self
, kubectl
: Kubectl
, name
: str) -> (str, str):
895 v1_core
= kubectl
.clients
[CORE_CLIENT
]
901 service_accounts
= v1_core
.list_namespaced_service_account(
902 ADMIN_NAMESPACE
, field_selector
="metadata.name={}".format(name
)
904 if len(service_accounts
.items
) == 0:
906 "Service account not found with metadata.name={}".format(name
)
908 service_account
= service_accounts
.items
[0]
909 if service_account
.secrets
and len(service_account
.secrets
) > 0:
910 secret_name
= service_account
.secrets
[0].name
911 if secret_name
is not None or not retries_limit
:
915 "Failed getting the secret from service account {}".format(name
)
917 secret
= v1_core
.list_namespaced_secret(
919 field_selector
="metadata.name={}".format(secret_name
),
922 token
= secret
.data
[SERVICE_ACCOUNT_TOKEN_KEY
]
923 client_certificate_data
= secret
.data
[SERVICE_ACCOUNT_ROOT_CA_KEY
]
926 base64
.b64decode(token
).decode("utf-8"),
927 base64
.b64decode(client_certificate_data
).decode("utf-8"),
931 def generate_kdu_instance_name(**kwargs
):
932 db_dict
= kwargs
.get("db_dict")
933 kdu_name
= kwargs
.get("kdu_name", None)
935 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
937 kdu_instance
= db_dict
["filter"]["_id"]
940 async def _get_libjuju(self
, vca_id
: str = None) -> Libjuju
:
944 :param: vca_id: VCA ID
945 If None, get a libjuju object with a Connection to the default VCA
946 Else, geta libjuju object with a Connection to the specified VCA
949 while self
.loading_libjuju
.locked():
950 await asyncio
.sleep(0.1)
952 async with self
.loading_libjuju
:
953 vca_connection
= await get_connection(self
._store
)
954 self
.libjuju
= Libjuju(vca_connection
, loop
=self
.loop
, log
=self
.log
)
957 vca_connection
= await get_connection(self
._store
, vca_id
)