OSM SOL005 client API
"""
-#from osmclient.v1 import vca
+# from osmclient.v1 import vca
from osmclient.sol005 import vnfd
from osmclient.sol005 import nsd
from osmclient.sol005 import nst
from osmclient.sol005 import sdncontroller
from osmclient.sol005 import project as projectmodule
from osmclient.sol005 import user as usermodule
+from osmclient.sol005 import role
from osmclient.sol005 import pdud
-from osmclient.common.exceptions import ClientException
+from osmclient.sol005 import k8scluster
+from osmclient.sol005 import vca
+from osmclient.sol005 import repo
+from osmclient.sol005 import osmrepo
+from osmclient.sol005 import subscription
+from osmclient.common import package_tool
import json
+import logging
-class Client(object):
+class Client(object):
def __init__(
self,
host=None,
so_port=9999,
- user='admin',
- password='admin',
- project='admin',
- **kwargs):
+ user="admin",
+ password="admin",
+ project="admin",
+ **kwargs
+ ):
self._user = user
self._password = password
self._project = project
- self._auth_endpoint = '/admin/v1/tokens'
+ self._project_domain_name = kwargs.get("project_domain_name")
+ self._user_domain_name = kwargs.get("user_domain_name")
+ self._logger = logging.getLogger("osmclient")
+ self._auth_endpoint = "/admin/v1/tokens"
self._headers = {}
-
- if len(host.split(':')) > 1:
+ self._token = None
+ if len(host.split(":")) > 1:
# backwards compatible, port provided as part of host
- self._host = host.split(':')[0]
- self._so_port = host.split(':')[1]
+ self._host = host.split(":")[0]
+ self._so_port = host.split(":")[1]
else:
self._host = host
self._so_port = so_port
self._http_client = http.Http(
- 'https://{}:{}/osm'.format(self._host,self._so_port))
- self._headers['Accept'] = 'application/json'
- self._headers['Content-Type'] = 'application/yaml'
- http_header = ['{}: {}'.format(key,val)
- for (key,val) in list(self._headers.items())]
- self._http_client.set_http_header(http_header)
-
- token = self.get_token()
- if not token:
- raise ClientException(
- 'Authentication error: not possible to get auth token')
- self._headers['Authorization'] = 'Bearer {}'.format(token)
- http_header.append('Authorization: Bearer {}'.format(token))
+ "https://{}:{}/osm".format(self._host, self._so_port), **kwargs
+ )
+ self._headers["Accept"] = "application/json"
+ self._headers["Content-Type"] = "application/yaml"
+ http_header = [
+ "{}: {}".format(key, val) for (key, val) in list(self._headers.items())
+ ]
self._http_client.set_http_header(http_header)
self.vnfd = vnfd.Vnfd(self._http_client, client=self)
self.vnf = vnf.Vnf(self._http_client, client=self)
self.project = projectmodule.Project(self._http_client, client=self)
self.user = usermodule.User(self._http_client, client=self)
+ self.role = role.Role(self._http_client, client=self)
self.pdu = pdud.Pdu(self._http_client, client=self)
- '''
+ self.k8scluster = k8scluster.K8scluster(self._http_client, client=self)
+ self.vca = vca.VCA(self._http_client, client=self)
+ self.repo = repo.Repo(self._http_client, client=self)
+ self.osmrepo = osmrepo.OSMRepo(self._http_client, client=self)
+ self.package_tool = package_tool.PackageTool(client=self)
+ self.subscription = subscription.Subscription(self._http_client, client=self)
+ """
self.vca = vca.Vca(http_client, client=self, **kwargs)
self.utils = utils.Utils(http_client, **kwargs)
- '''
+ """
def get_token(self):
- postfields_dict = {'username': self._user,
- 'password': self._password,
- 'project_id': self._project}
- http_code, resp = self._http_client.post_cmd(endpoint=self._auth_endpoint,
- postfields_dict=postfields_dict)
- if http_code not in (200, 201, 202, 204):
- raise ClientException(resp)
- token = json.loads(resp) if resp else None
- if token is not None:
- return token['_id']
- return None
+ self._logger.debug("")
+ if self._token is None:
+ postfields_dict = {
+ "username": self._user,
+ "password": self._password,
+ "project_id": self._project,
+ }
+ if self._project_domain_name:
+ postfields_dict["project_domain_name"] = self._project_domain_name
+ if self._user_domain_name:
+ postfields_dict["user_domain_name"] = self._user_domain_name
+ http_code, resp = self._http_client.post_cmd(
+ endpoint=self._auth_endpoint,
+ postfields_dict=postfields_dict,
+ skip_query_admin=True,
+ )
+ # if http_code not in (200, 201, 202, 204):
+ # message ='Authentication error: not possible to get auth token\nresp:\n{}'.format(resp)
+ # raise ClientException(message)
+ token = json.loads(resp) if resp else None
+ self._token = token["id"]
+
+ if self._token is not None:
+ self._headers["Authorization"] = "Bearer {}".format(self._token)
+ http_header = [
+ "{}: {}".format(key, val)
+ for (key, val) in list(self._headers.items())
+ ]
+ self._http_client.set_http_header(http_header)
+
+ def get_version(self):
+ _, resp = self._http_client.get2_cmd(endpoint="/version", skip_query_admin=True)
+ # print(http_code, resp)
+ try:
+ resp = json.loads(resp)
+ version = resp.get("version")
+ date = resp.get("date")
+ except ValueError:
+ version = resp.split()[2]
+ date = resp.split()[4]
+ return "{} {}".format(version, date)
+
+ def set_default_params(self, **kwargs):
+ host = kwargs.pop("host", None)
+ if host is not None:
+ self._host = host
+ port = kwargs.pop("port", None)
+ if port is not None:
+ self._so_port = port
+ self._http_client.set_query_admin(**kwargs)