Coverage for osmclient/sol005/client.py: 31%

93 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-29 09:50 +0000

1# Copyright 2018 Telefonica 

2# 

3# All Rights Reserved. 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16 

17""" 

18OSM SOL005 client API 

19""" 

20 

21from osmclient.sol005 import vnfd 

22from osmclient.sol005 import nsd 

23from osmclient.sol005 import nst 

24from osmclient.sol005 import nsi 

25from osmclient.sol005 import ns 

26from osmclient.sol005 import vnf 

27from osmclient.sol005 import vim 

28from osmclient.sol005 import wim 

29from osmclient.sol005 import package 

30from osmclient.sol005 import http 

31from osmclient.sol005 import sdncontroller 

32from osmclient.sol005 import project as projectmodule 

33from osmclient.sol005 import user as usermodule 

34from osmclient.sol005 import role 

35from osmclient.sol005 import pdud 

36from osmclient.sol005 import k8scluster 

37from osmclient.sol005 import vca 

38from osmclient.sol005 import repo 

39from osmclient.sol005 import osmrepo 

40from osmclient.sol005 import subscription 

41from osmclient.common import package_tool 

42from osmclient.common.exceptions import ClientException 

43import json 

44import logging 

45 

46 

47class Client(object): 

48 def __init__( 

49 self, 

50 host=None, 

51 so_port=9999, 

52 user="admin", 

53 password="admin", 

54 project="admin", 

55 **kwargs 

56 ): 

57 self._user = user 

58 self._password = password 

59 self._project = project 

60 self._project_domain_name = kwargs.get("project_domain_name") 

61 self._user_domain_name = kwargs.get("user_domain_name") 

62 self._logger = logging.getLogger("osmclient") 

63 self._auth_endpoint = "/admin/v1/tokens" 

64 self._headers = {} 

65 self._token = None 

66 if len(host.split(":")) > 1: 

67 # backwards compatible, port provided as part of host 

68 self._host = host.split(":")[0] 

69 self._so_port = host.split(":")[1] 

70 else: 

71 self._host = host 

72 self._so_port = so_port 

73 

74 self._http_client = http.Http( 

75 "https://{}:{}/osm".format(self._host, self._so_port), **kwargs 

76 ) 

77 self._headers["Accept"] = "application/json" 

78 self._headers["Content-Type"] = "application/yaml" 

79 self._http_client.set_http_header(self._headers) 

80 

81 self.vnfd = vnfd.Vnfd(self._http_client, client=self) 

82 self.nsd = nsd.Nsd(self._http_client, client=self) 

83 self.nst = nst.Nst(self._http_client, client=self) 

84 self.package = package.Package(self._http_client, client=self) 

85 self.ns = ns.Ns(self._http_client, client=self) 

86 self.nsi = nsi.Nsi(self._http_client, client=self) 

87 self.vim = vim.Vim(self._http_client, client=self) 

88 self.wim = wim.Wim(self._http_client, client=self) 

89 self.sdnc = sdncontroller.SdnController(self._http_client, client=self) 

90 self.vnf = vnf.Vnf(self._http_client, client=self) 

91 self.project = projectmodule.Project(self._http_client, client=self) 

92 self.user = usermodule.User(self._http_client, client=self) 

93 self.role = role.Role(self._http_client, client=self) 

94 self.pdu = pdud.Pdu(self._http_client, client=self) 

95 self.k8scluster = k8scluster.K8scluster(self._http_client, client=self) 

96 self.vca = vca.VCA(self._http_client, client=self) 

97 self.repo = repo.Repo(self._http_client, client=self) 

98 self.osmrepo = osmrepo.OSMRepo(self._http_client, client=self) 

99 self.package_tool = package_tool.PackageTool(client=self) 

100 self.subscription = subscription.Subscription(self._http_client, client=self) 

101 """ 

102 self.vca = vca.Vca(http_client, client=self, **kwargs) 

103 self.utils = utils.Utils(http_client, **kwargs) 

104 """ 

105 

106 def get_token(self, pwd_change=False): 

107 self._logger.debug("") 

108 if self._token is None: 

109 postfields_dict = { 

110 "username": self._user, 

111 "password": self._password, 

112 "project_id": self._project, 

113 } 

114 if self._project_domain_name: 

115 postfields_dict["project_domain_name"] = self._project_domain_name 

116 if self._user_domain_name: 

117 postfields_dict["user_domain_name"] = self._user_domain_name 

118 http_code, resp = self._http_client.post_cmd( 

119 endpoint=self._auth_endpoint, 

120 postfields_dict=postfields_dict, 

121 skip_query_admin=True, 

122 ) 

123 # if http_code not in (200, 201, 202, 204): 

124 # message ='Authentication error: not possible to get auth token\nresp:\n{}'.format(resp) 

125 # raise ClientException(message) 

126 

127 token = json.loads(resp) if resp else None 

128 if token.get("message") == "change_password" and not pwd_change: 

129 raise ClientException( 

130 "Password Expired. Please update the password using change_password option" 

131 ) 

132 self._token = token["id"] 

133 

134 if self._token is not None: 

135 self._headers["Authorization"] = "Bearer {}".format(self._token) 

136 self._http_client.set_http_header(self._headers) 

137 return token 

138 

139 def get_version(self): 

140 _, resp = self._http_client.get2_cmd(endpoint="/version", skip_query_admin=True) 

141 # print(http_code, resp) 

142 try: 

143 resp = json.loads(resp) 

144 version = resp.get("version") 

145 date = resp.get("date") 

146 except ValueError: 

147 version = resp.split()[2] 

148 date = resp.split()[4] 

149 return "{} {}".format(version, date)