blob: 70831e44c138580ee1b8b6e6bc2eec094253c640 [file] [log] [blame]
# Copyright 2020 ETSI OSM
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License
from subprocess import run, PIPE
import requests
class OSMException(Exception):
"""A base class for MyProject exceptions."""
class ResourceException(OSMException):
def __init__(self, *args, **kwargs):
super().__init__(*args)
self.message = kwargs.get('message')
class OSM:
def __init__(self, osm_hostname):
self.osm_hostname = osm_hostname
def run_command(self, command):
p = run(["osm {}".format(command)], shell=True, stdout=PIPE, stderr=PIPE)
if p.returncode != 0:
print(p.stdout)
print(p.stderr)
raise ResourceException(message=p.stdout)
else:
return self.clean_output(p.stdout)
def get_auth_token(self):
r = requests.post("https://{}:9999/osm/admin/v1/tokens".format(self.osm_hostname), verify=False, headers={"Accept":"application/json"}, json={
"username": "admin",
"password": "admin",
"project": "admin"
})
return r.json()["id"]
def clean_output(self, output):
return output.decode("utf-8").strip()
def create_nsd(self, filename):
return self.run_command("nsd-create {}".format(filename))
def create_vnfd(self, filename):
return self.run_command("vnfd-create {}".format(filename))
def delete_nsd(self, id):
return self.run_command("nsd-delete {}".format(id))
def delete_vnfd(self, id):
return self.run_command("vnfd-delete {}".format(id))