Coverage for osmclient/sol005/client.py: 31%
93 statements
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-29 09:50 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-29 09:50 +0000
1# Copyright 2018 Telefonica
2#
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
17"""
18OSM SOL005 client API
19"""
21from osmclient.sol005 import vnfd
22from osmclient.sol005 import nsd
23from osmclient.sol005 import nst
24from osmclient.sol005 import nsi
25from osmclient.sol005 import ns
26from osmclient.sol005 import vnf
27from osmclient.sol005 import vim
28from osmclient.sol005 import wim
29from osmclient.sol005 import package
30from osmclient.sol005 import http
31from osmclient.sol005 import sdncontroller
32from osmclient.sol005 import project as projectmodule
33from osmclient.sol005 import user as usermodule
34from osmclient.sol005 import role
35from osmclient.sol005 import pdud
36from osmclient.sol005 import k8scluster
37from osmclient.sol005 import vca
38from osmclient.sol005 import repo
39from osmclient.sol005 import osmrepo
40from osmclient.sol005 import subscription
41from osmclient.common import package_tool
42from osmclient.common.exceptions import ClientException
43import json
44import logging
47class Client(object):
48 def __init__(
49 self,
50 host=None,
51 so_port=9999,
52 user="admin",
53 password="admin",
54 project="admin",
55 **kwargs
56 ):
57 self._user = user
58 self._password = password
59 self._project = project
60 self._project_domain_name = kwargs.get("project_domain_name")
61 self._user_domain_name = kwargs.get("user_domain_name")
62 self._logger = logging.getLogger("osmclient")
63 self._auth_endpoint = "/admin/v1/tokens"
64 self._headers = {}
65 self._token = None
66 if len(host.split(":")) > 1:
67 # backwards compatible, port provided as part of host
68 self._host = host.split(":")[0]
69 self._so_port = host.split(":")[1]
70 else:
71 self._host = host
72 self._so_port = so_port
74 self._http_client = http.Http(
75 "https://{}:{}/osm".format(self._host, self._so_port), **kwargs
76 )
77 self._headers["Accept"] = "application/json"
78 self._headers["Content-Type"] = "application/yaml"
79 self._http_client.set_http_header(self._headers)
81 self.vnfd = vnfd.Vnfd(self._http_client, client=self)
82 self.nsd = nsd.Nsd(self._http_client, client=self)
83 self.nst = nst.Nst(self._http_client, client=self)
84 self.package = package.Package(self._http_client, client=self)
85 self.ns = ns.Ns(self._http_client, client=self)
86 self.nsi = nsi.Nsi(self._http_client, client=self)
87 self.vim = vim.Vim(self._http_client, client=self)
88 self.wim = wim.Wim(self._http_client, client=self)
89 self.sdnc = sdncontroller.SdnController(self._http_client, client=self)
90 self.vnf = vnf.Vnf(self._http_client, client=self)
91 self.project = projectmodule.Project(self._http_client, client=self)
92 self.user = usermodule.User(self._http_client, client=self)
93 self.role = role.Role(self._http_client, client=self)
94 self.pdu = pdud.Pdu(self._http_client, client=self)
95 self.k8scluster = k8scluster.K8scluster(self._http_client, client=self)
96 self.vca = vca.VCA(self._http_client, client=self)
97 self.repo = repo.Repo(self._http_client, client=self)
98 self.osmrepo = osmrepo.OSMRepo(self._http_client, client=self)
99 self.package_tool = package_tool.PackageTool(client=self)
100 self.subscription = subscription.Subscription(self._http_client, client=self)
101 """
102 self.vca = vca.Vca(http_client, client=self, **kwargs)
103 self.utils = utils.Utils(http_client, **kwargs)
104 """
106 def get_token(self, pwd_change=False):
107 self._logger.debug("")
108 if self._token is None:
109 postfields_dict = {
110 "username": self._user,
111 "password": self._password,
112 "project_id": self._project,
113 }
114 if self._project_domain_name:
115 postfields_dict["project_domain_name"] = self._project_domain_name
116 if self._user_domain_name:
117 postfields_dict["user_domain_name"] = self._user_domain_name
118 http_code, resp = self._http_client.post_cmd(
119 endpoint=self._auth_endpoint,
120 postfields_dict=postfields_dict,
121 skip_query_admin=True,
122 )
123 # if http_code not in (200, 201, 202, 204):
124 # message ='Authentication error: not possible to get auth token\nresp:\n{}'.format(resp)
125 # raise ClientException(message)
127 token = json.loads(resp) if resp else None
128 if token.get("message") == "change_password" and not pwd_change:
129 raise ClientException(
130 "Password Expired. Please update the password using change_password option"
131 )
132 self._token = token["id"]
134 if self._token is not None:
135 self._headers["Authorization"] = "Bearer {}".format(self._token)
136 self._http_client.set_http_header(self._headers)
137 return token
139 def get_version(self):
140 _, resp = self._http_client.get2_cmd(endpoint="/version", skip_query_admin=True)
141 # print(http_code, resp)
142 try:
143 resp = json.loads(resp)
144 version = resp.get("version")
145 date = resp.get("date")
146 except ValueError:
147 version = resp.split()[2]
148 date = resp.split()[4]
149 return "{} {}".format(version, date)