OSM SOL005 client API
"""
-#from osmclient.v1 import vca
+# from osmclient.v1 import vca
from osmclient.sol005 import vnfd
from osmclient.sol005 import nsd
from osmclient.sol005 import nst
from osmclient.sol005 import role
from osmclient.sol005 import pdud
from osmclient.sol005 import k8scluster
+from osmclient.sol005 import vca
from osmclient.sol005 import repo
+from osmclient.sol005 import osmrepo
+from osmclient.sol005 import subscription
from osmclient.common import package_tool
import json
import logging
class Client(object):
-
def __init__(
self,
host=None,
so_port=9999,
- user='admin',
- password='admin',
- project='admin',
- **kwargs):
+ user="admin",
+ password="admin",
+ project="admin",
+ **kwargs
+ ):
self._user = user
self._password = password
self._project = project
self._project_domain_name = kwargs.get("project_domain_name")
self._user_domain_name = kwargs.get("user_domain_name")
- self._logger = logging.getLogger('osmclient')
- self._auth_endpoint = '/admin/v1/tokens'
+ self._logger = logging.getLogger("osmclient")
+ self._auth_endpoint = "/admin/v1/tokens"
self._headers = {}
self._token = None
- if len(host.split(':')) > 1:
+ if len(host.split(":")) > 1:
# backwards compatible, port provided as part of host
- self._host = host.split(':')[0]
- self._so_port = host.split(':')[1]
+ self._host = host.split(":")[0]
+ self._so_port = host.split(":")[1]
else:
self._host = host
self._so_port = so_port
self._http_client = http.Http(
- 'https://{}:{}/osm'.format(self._host,self._so_port), **kwargs)
- self._headers['Accept'] = 'application/json'
- self._headers['Content-Type'] = 'application/yaml'
- http_header = ['{}: {}'.format(key, val)
- for (key, val) in list(self._headers.items())]
+ "https://{}:{}/osm".format(self._host, self._so_port), **kwargs
+ )
+ self._headers["Accept"] = "application/json"
+ self._headers["Content-Type"] = "application/yaml"
+ http_header = [
+ "{}: {}".format(key, val) for (key, val) in list(self._headers.items())
+ ]
self._http_client.set_http_header(http_header)
self.vnfd = vnfd.Vnfd(self._http_client, client=self)
self.role = role.Role(self._http_client, client=self)
self.pdu = pdud.Pdu(self._http_client, client=self)
self.k8scluster = k8scluster.K8scluster(self._http_client, client=self)
+ self.vca = vca.VCA(self._http_client, client=self)
self.repo = repo.Repo(self._http_client, client=self)
+ self.osmrepo = osmrepo.OSMRepo(self._http_client, client=self)
self.package_tool = package_tool.PackageTool(client=self)
- '''
+ self.subscription = subscription.Subscription(self._http_client, client=self)
+ """
self.vca = vca.Vca(http_client, client=self, **kwargs)
self.utils = utils.Utils(http_client, **kwargs)
- '''
+ """
def get_token(self):
self._logger.debug("")
if self._token is None:
- postfields_dict = {'username': self._user,
- 'password': self._password,
- 'project_id': self._project}
+ postfields_dict = {
+ "username": self._user,
+ "password": self._password,
+ "project_id": self._project,
+ }
if self._project_domain_name:
postfields_dict["project_domain_name"] = self._project_domain_name
if self._user_domain_name:
postfields_dict["user_domain_name"] = self._user_domain_name
- http_code, resp = self._http_client.post_cmd(endpoint=self._auth_endpoint,
- postfields_dict=postfields_dict,
- skip_query_admin=True)
-# if http_code not in (200, 201, 202, 204):
-# message ='Authentication error: not possible to get auth token\nresp:\n{}'.format(resp)
-# raise ClientException(message)
+ http_code, resp = self._http_client.post_cmd(
+ endpoint=self._auth_endpoint,
+ postfields_dict=postfields_dict,
+ skip_query_admin=True,
+ )
+ # if http_code not in (200, 201, 202, 204):
+ # message ='Authentication error: not possible to get auth token\nresp:\n{}'.format(resp)
+ # raise ClientException(message)
token = json.loads(resp) if resp else None
- self._token = token['id']
+ self._token = token["id"]
if self._token is not None:
- self._headers['Authorization'] = 'Bearer {}'.format(self._token)
- http_header = ['{}: {}'.format(key, val)
- for (key, val) in list(self._headers.items())]
+ self._headers["Authorization"] = "Bearer {}".format(self._token)
+ http_header = [
+ "{}: {}".format(key, val)
+ for (key, val) in list(self._headers.items())
+ ]
self._http_client.set_http_header(http_header)
def get_version(self):
_, resp = self._http_client.get2_cmd(endpoint="/version", skip_query_admin=True)
- #print(http_code, resp)
+ # print(http_code, resp)
try:
resp = json.loads(resp)
version = resp.get("version")
return "{} {}".format(version, date)
def set_default_params(self, **kwargs):
- host = kwargs.pop('host', None)
- if host != None:
- self._host=host
- port = kwargs.pop('port', None)
- if port != None:
- self._so_port=port
+ host = kwargs.pop("host", None)
+ if host is not None:
+ self._host = host
+ port = kwargs.pop("port", None)
+ if port is not None:
+ self._so_port = port
self._http_client.set_query_admin(**kwargs)