# See the License for the specific language governing permissions and
# limitations under the License.
+
class JujuCharmNotFound(Exception):
"""The Charm can't be found or is not readable."""
def __str__(self):
return '<{}> Invalid certificate: {}'.format(type(self), super().__str__())
+
+
+class K8sException(Exception):
+ """
+ K8s exception
+ """
+
+ def __init__(self, message: str):
+ Exception.__init__(self, message)
+ self._message = message
+
+ def __str__(self):
+ return self._message
+
+ def __repr__(self):
+ return self._message
from uuid import uuid4
import random
from n2vc.k8s_conn import K8sConnector
+from n2vc.exceptions import K8sException
class K8sHelmConnector(K8sConnector):
msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
.format(cluster_uuid)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
if uninstall_sw:
else:
msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
self.error(msg)
- # raise Exception(msg)
self.debug('namespace for tiller: {}'.format(namespace))
version_str = '--version {}'.format(parts[1])
kdu_model = parts[0]
- # generate a name for the releas. Then, check if already exists
+ # generate a name for the release. Then, check if already exists
kdu_instance = None
while kdu_instance is None:
kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
if rc != 0:
msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
self.debug('Returning kdu_instance {}'.format(kdu_instance))
return kdu_instance
if rc != 0:
msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
# return new revision number
instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
if rc != 0:
msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
# return new revision number
instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
def get_random_number():
r = random.randrange(start=1, stop=99999999)
s = str(r)
- s = s.rjust(width=10, fillchar=' ')
+ s = s.rjust(10, '0')
return s
name = name + get_random_number()
if not os.path.exists(cluster_dir):
msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
# kube dir
kube_dir = cluster_dir + '/' + '.kube'
if not os.path.exists(kube_dir):
msg = 'Kube config dir {} does not exist'.format(kube_dir)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
# helm home dir
helm_dir = cluster_dir + '/' + '.helm'
if not os.path.exists(helm_dir):
msg = 'Helm config dir {} does not exist'.format(helm_dir)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
config_filename = kube_dir + '/config'
return kube_dir, helm_dir, config_filename, cluster_dir
msg = 'File {} does not exist'.format(filename)
if exception_if_not_exists:
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
if credentials is None:
raise N2VCBadArgumentsException(message='credentials are mandatory', bad_args=['credentials'])
- if 'hostname' in credentials:
+ if credentials.get('hostname'):
hostname = credentials['hostname']
else:
raise N2VCBadArgumentsException(message='hostname is mandatory', bad_args=['credentials.hostname'])
- if 'username' in credentials:
+ if credentials.get('username'):
username = credentials['username']
else:
raise N2VCBadArgumentsException(message='username is mandatory', bad_args=['credentials.username'])