import abc
import asyncio
+from typing import Union
import time
from n2vc.loggable import Loggable
################################### P U B L I C ####################################
####################################################################################
"""
+
@staticmethod
def generate_kdu_instance_name(**kwargs):
raise NotImplementedError("Method not implemented")
@abc.abstractmethod
async def scale(
- self, kdu_instance: str,
- scale: int,
- resource_name: str,
- total_timeout: float = 1800,
- **kwargs,
+ self,
+ kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ **kwargs,
) -> bool:
"""
Scales an application in KDU instance.
@abc.abstractmethod
async def get_scale_count(
- self,
- resource_name: str,
- kdu_instance: str,
- **kwargs,
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ **kwargs,
) -> int:
"""
Get an application scale count.
"""
@abc.abstractmethod
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
+ async def status_kdu(
+ self, cluster_uuid: str, kdu_instance: str, yaml_format: str
+ ) -> Union[str, dict]:
"""
This call would retrieve tha current state of a given KDU instance. It would be
would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance
+ :param yaml_format: if the return shall be returned as an YAML string or as a
+ dictionary
:return: If successful, it will return the following vector of arguments:
- K8s `namespace` in the cluster where the KDU lives
- `state` of the KDU instance. It can be:
"""
@abc.abstractmethod
- async def get_services(self,
- cluster_uuid: str,
- kdu_instance: str,
- namespace: str) -> list:
+ async def get_services(
+ self, cluster_uuid: str, kdu_instance: str, namespace: str
+ ) -> list:
"""
Returns a list of services defined for the specified kdu instance.
"""
@abc.abstractmethod
- async def get_service(self,
- cluster_uuid: str,
- service_name: str,
- namespace: str = None) -> object:
+ async def get_service(
+ self, cluster_uuid: str, service_name: str, namespace: str = None
+ ) -> object:
"""
Obtains the data of the specified service in the k8cluster.
async def write_app_status_to_db(
self, db_dict: dict, status: str, detailed_status: str, operation: str
) -> bool:
+ """
+ This method will write the status of the application to the database.
+
+ :param db_dict: A dictionary with the database necessary information. It shall contain the values for the keys:
+ - "collection": The Mongo DB collection to write to
+ - "filter": The query filter to use in the update process
+ - "path": The dot separated keys which targets the object to be updated
+ :param status: Status of the application
+ :param detailed_status: Detailed status of the application
+ :param operation: Operation that is being performed on the application
+ :return: True if successful
+ """
if not self.db:
self.warning("No db => No database write")
self.log.debug("status={}".format(status))
try:
-
the_table = db_dict["collection"]
the_filter = db_dict["filter"]
the_path = db_dict["path"]