Code Coverage

Cobertura Coverage Report > osmclient.sol005 >

vca.py

Trend

Classes100%
 
Lines100%
 
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
vca.py
100%
1/1
100%
64/64
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
vca.py
100%
64/64
N/A

Source

osmclient/sol005/vca.py
1 # Copyright 2021 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #     Unless required by applicable law or agreed to in writing, software
10 #     distributed under the License is distributed on an "AS IS" BASIS,
11 #     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #     See the License for the specific language governing permissions and
13 #     limitations under the License.
14
15 1 """
16 OSM K8s cluster API handling
17 """
18
19 1 from osmclient.common import utils
20 1 from osmclient.common.exceptions import NotFound
21 1 from osmclient.common.exceptions import ClientException
22 1 import json
23
24
25 1 class VCA(object):
26 1     def __init__(self, http=None, client=None):
27 1         self._http = http
28 1         self._client = client
29 1         self._apiName = "/admin"
30 1         self._apiVersion = "/v1"
31 1         self._apiResource = "/vca"
32 1         self._apiBase = "{}{}{}".format(
33             self._apiName, self._apiVersion, self._apiResource
34         )
35
36 1     def create(self, name, vca):
37 1         self._client.get_token()
38 1         http_code, resp = self._http.post_cmd(
39             endpoint=self._apiBase, postfields_dict=vca
40         )
41 1         resp = json.loads(resp) if resp else {}
42 1         if "id" not in resp:
43 1             raise ClientException("unexpected response from server - {}".format(resp))
44 1         print(resp["id"])
45
46 1     def update(self, name, vca):
47 1         self._client.get_token()
48 1         vca_id = self.get(name)["_id"]
49 1         self._http.patch_cmd(
50             endpoint="{}/{}".format(self._apiBase, vca_id),
51             postfields_dict=vca,
52         )
53
54 1     def get_id(self, name):
55         """Returns a VCA id from a VCA name"""
56 1         for vca in self.list():
57 1             if name == vca["name"]:
58 1                 return vca["_id"]
59 1         raise NotFound("VCA {} not found".format(name))
60
61 1     def delete(self, name, force=False):
62 1         self._client.get_token()
63 1         vca_id = name
64 1         if not utils.validate_uuid4(name):
65 1             vca_id = self.get_id(name)
66 1         querystring = "?FORCE=True" if force else ""
67 1         http_code, resp = self._http.delete_cmd(
68             "{}/{}{}".format(self._apiBase, vca_id, querystring)
69         )
70 1         if http_code == 202:
71 1             print("Deletion in progress")
72 1         elif http_code == 204:
73 1             print("Deleted")
74         else:
75 1             msg = resp or ""
76 1             raise ClientException("failed to delete VCA {} - {}".format(name, msg))
77
78 1     def list(self, cmd_filter=None):
79         """Returns a list of K8s clusters"""
80 1         self._client.get_token()
81 1         filter_string = ""
82 1         if cmd_filter:
83 1             filter_string = "?{}".format(cmd_filter)
84 1         _, resp = self._http.get2_cmd("{}{}".format(self._apiBase, filter_string))
85 1         if resp:
86 1             return json.loads(resp)
87 1         return list()
88
89 1     def get(self, name):
90         """Returns a VCA based on name or id"""
91 1         self._client.get_token()
92 1         vca_id = name
93 1         if not utils.validate_uuid4(name):
94 1             vca_id = self.get_id(name)
95 1         try:
96 1             _, resp = self._http.get2_cmd("{}/{}".format(self._apiBase, vca_id))
97 1             resp = json.loads(resp) if resp else {}
98 1             if "_id" not in resp:
99 1                 raise ClientException("failed to get VCA info: {}".format(resp))
100 1             return resp
101 1         except NotFound:
102 1             raise NotFound("VCA {} not found".format(name))