From: Adam Israel Date: Wed, 31 Jan 2018 23:42:38 +0000 (-0500) Subject: Add juju support to lcm X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;ds=sidebyside;h=refs%2Fheads%2Flcm;p=osm%2FRO.git Add juju support to lcm This adds basic Juju support to the lcm Signed-off-by: Adam Israel --- diff --git a/lcm/juju_api.py b/lcm/juju_api.py new file mode 100644 index 00000000..9b8fa5ad --- /dev/null +++ b/lcm/juju_api.py @@ -0,0 +1,855 @@ +############################################################################ +# Copyright 2016 RIFT.io Inc # +# # +# Licensed under the Apache License, Version 2.0 (the "License"); # +# you may not use this file except in compliance with the License. # +# You may obtain a copy of the License at # +# # +# http://www.apache.org/licenses/LICENSE-2.0 # +# # +# Unless required by applicable law or agreed to in writing, software # +# distributed under the License is distributed on an "AS IS" BASIS, # +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # +# See the License for the specific language governing permissions and # +# limitations under the License. # +############################################################################ + +import argparse +import asyncio +import logging +import os +import ssl + +import juju.loop +from juju.controller import Controller +from juju.model import Model, ModelObserver + +try: + ssl._create_default_https_context = ssl._create_unverified_context +except AttributeError: + # Legacy Python doesn't verify by default (see pep-0476) + # https://www.python.org/dev/peps/pep-0476/ + pass + + +class JujuVersionError(Exception): + pass + + +class JujuApiError(Exception): + pass + + +class JujuEnvError(JujuApiError): + pass + + +class JujuModelError(JujuApiError): + pass + + +class JujuStatusError(JujuApiError): + pass + + +class JujuUnitsError(JujuApiError): + pass + + +class JujuWaitUnitsError(JujuApiError): + pass + + +class JujuSrvNotDeployedError(JujuApiError): + pass + + +class JujuAddCharmError(JujuApiError): + pass + + +class JujuDeployError(JujuApiError): + pass + + +class JujuDestroyError(JujuApiError): + pass + + +class JujuResolveError(JujuApiError): + pass + + +class JujuActionError(JujuApiError): + pass + + +class JujuActionApiError(JujuActionError): + pass + + +class JujuActionInfoError(JujuActionError): + pass + + +class JujuActionExecError(JujuActionError): + pass + + +class JujuAuthenticationError(Exception): + pass + + +class JujuMonitor(ModelObserver): + """Monitor state changes within the Juju Model.""" + # async def on_change(self, delta, old, new, model): + # """React to changes in the Juju model.""" + # + # # TODO: Setup the hook to update the UI if the status of a unit changes + # # to be used when deploying a charm and waiting for it to be "ready" + # if delta.entity in ['application', 'unit'] and delta.type == "change": + # pass + # + # # TODO: Add a hook when an action is complete + + pass + + +class JujuApi(object): + """JujuApi wrapper on jujuclient library. + + There should be one instance of JujuApi for each VNF manged by Juju. + + Assumption: + Currently we use one unit per service/VNF. So once a service + is deployed, we store the unit name and reuse it + """ + + log = None + controller = None + models = {} + model = None + model_name = None + model_uuid = None + authenticated = False + + def __init__(self, + log=None, + loop=None, + server='127.0.0.1', + port=17070, + user='admin', + secret=None, + version=None, + model_name='default', + ): + """Initialize with the Juju credentials.""" + + if log: + self.log = log + else: + self.log = logging.getLogger(__name__) + + # Quiet websocket traffic + logging.getLogger('websockets.protocol').setLevel(logging.INFO) + + self.log.debug('JujuApi: instantiated') + + self.server = server + self.port = port + + self.secret = secret + if user.startswith('user-'): + self.user = user + else: + self.user = 'user-{}'.format(user) + + self.endpoint = '%s:%d' % (server, int(port)) + + self.model_name = model_name + + if loop: + self.loop = loop + + def __del__(self): + """Close any open connections.""" + yield self.logout() + + async def add_relation(self, a, b, via=None): + """ + Add a relation between two application endpoints. + + :param a An application endpoint + :param b An application endpoint + :param via The egress subnet(s) for outbound traffic, e.g., (192.168.0.0/16,10.0.0.0/8) + """ + if not self.authenticated: + await self.login() + + m = await self.get_model() + try: + m.add_relation(a, b, via) + finally: + await m.disconnect() + + async def apply_config(self, config, application): + """Apply a configuration to the application.""" + self.log.debug("JujuApi: Applying configuration to {}.".format( + application + )) + return await self.set_config(application=application, config=config) + + async def deploy_application(self, charm, name="", path="", specs={}): + """ + Deploy an application. + + Deploy an application to a container or a machine already provisioned + by the OSM Resource Orchestrator (requires the Juju public ssh key + installed on the new machine via cloud-init). + + :param str charm: The name of the charm + :param str name: The name of the application, if different than the charm + :param str path: The path to the charm + :param dict machine: A dictionary identifying the machine to manage via Juju + Examples:: + + deploy_application(..., specs={'host': '10.0.0.4', 'user': 'ubuntu'}) + """ + if not self.authenticated: + await self.login() + + # Check that the charm is valid and exists. + if charm is None: + return None + + app = await self.get_application(name) + if app is None: + + # Check for specific machine placement + to = None + if all(k in specs for k in ['hostname', 'username']): + machine = await self.model.add_machine(spec='ssh:%@%'.format( + specs['host'], + specs['user'], + )) + to = machine.id + + # TODO: Handle the error if the charm isn't found. + self.log.debug("JujuApi: Deploying charm {} ({}) from {}".format( + charm, + name, + path, + to=to, + )) + app = await self.model.deploy( + path, + application_name=name, + series='xenial', + ) + return app + deploy_service = deploy_application + + async def get_action_status(self, uuid): + """Get the status of an action.""" + if not self.authenticated: + await self.login() + + self.log.debug("JujuApi: Waiting for status of action uuid {}".format(uuid)) + action = await self.model.wait_for_action(uuid) + return action.status + + async def get_application(self, application): + """Get the deployed application.""" + if not self.authenticated: + await self.login() + + self.log.debug("JujuApi: Getting application {}".format(application)) + app = None + if application and self.model: + if self.model.applications: + if application in self.model.applications: + app = self.model.applications[application] + return app + + async def get_application_status(self, application): + """Get the status of an application.""" + if not self.authenticated: + await self.login() + + status = None + app = await self.get_application(application) + if app: + status = app.status + self.log.debug("JujuApi: Status of application {} is {}".format( + application, + str(status), + )) + return status + get_service_status = get_application_status + + async def get_config(self, application): + """Get the configuration of an application.""" + if not self.authenticated: + await self.login() + + config = None + app = await self.get_application(application) + if app: + config = await app.get_config() + + self.log.debug("JujuApi: Config of application {} is {}".format( + application, + str(config), + )) + + return config + + async def get_model(self, name='default'): + """Get a model from the Juju Controller. + + Note: Model objects returned must call disconnected() before it goes + out of scope.""" + if not self.authenticated: + await self.login() + + model = Model() + + uuid = await self.get_model_uuid(name) + + self.log.debug("JujuApi: Connecting to model {} ({})".format( + model, + uuid, + )) + + await model.connect( + self.endpoint, + uuid, + self.user, + self.secret, + None, + ) + + return model + + async def get_model_uuid(self, name='default'): + """Get the UUID of a model. + + Iterate through all models in a controller and find the matching model. + """ + if not self.authenticated: + await self.login() + + uuid = None + + models = await self.controller.get_models() + + self.log.debug("JujuApi: Looking through {} models for model {}".format( + len(models.user_models), + name, + )) + for model in models.user_models: + if model.model.name == name: + uuid = model.model.uuid + break + + return uuid + + async def get_status(self): + """Get the model status.""" + if not self.authenticated: + await self.login() + + if not self.model: + self.model = self.get_model(self.model_name) + + class model_state: + applications = {} + machines = {} + relations = {} + + self.log.debug("JujuApi: Getting model status") + status = model_state() + status.applications = self.model.applications + status.machines = self.model.machines + + return status + + async def is_application_active(self, application): + """Check if the application is in an active state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['active']: + state = True + + self.log.debug("JujuApi: Application {} is {} active".format( + application, + "" if status else "not", + )) + + return state + is_service_active = is_application_active + + async def is_application_blocked(self, application): + """Check if the application is in a blocked state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['blocked']: + state = True + + self.log.debug("JujuApi: Application {} is {} blocked".format( + application, + "" if status else "not", + )) + + return state + is_service_blocked = is_application_blocked + + async def is_application_deployed(self, application): + """Check if the application is in a deployed state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['active']: + state = True + self.log.debug("JujuApi: Application {} is {} deployed".format( + application, + "" if status else "not", + )) + + return state + is_service_deployed = is_application_deployed + + async def is_application_error(self, application): + """Check if the application is in an error state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['error']: + state = True + self.log.debug("JujuApi: Application {} is {} errored".format( + application, + "" if status else "not", + )) + + return state + is_service_error = is_application_error + + async def is_application_maint(self, application): + """Check if the application is in a maintenance state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['maintenance']: + state = True + self.log.debug("JujuApi: Application {} is {} in maintenence".format( + application, + "" if status else "not", + )) + + return state + is_service_maint = is_application_maint + + async def is_application_up(self, application=None): + """Check if the application is up.""" + if not self.authenticated: + await self.login() + state = False + + status = await self.get_application_status(application) + if status and status in ['active', 'blocked']: + state = True + self.log.debug("JujuApi: Application {} is {} up".format( + application, + "" if status else "not", + )) + return state + is_service_up = is_application_up + + async def login(self): + """Login to the Juju controller.""" + if self.authenticated: + return + cacert = None + self.controller = Controller() + + self.log.debug("JujuApi: Logging into controller") + + if self.secret: + await self.controller.connect( + self.endpoint, + self.user, + self.secret, + cacert, + ) + else: + await self.controller.connect_current() + + self.authenticated = True + self.model = await self.get_model(self.model_name) + + async def logout(self): + """Logout of the Juju controller.""" + if not self.authenticated: + return + + if self.model: + await self.model.disconnect() + self.model = None + if self.controller: + await self.controller.disconnect() + self.controller = None + + self.authenticated = False + + async def remove_application(self, name): + """Remove the application.""" + if not self.authenticated: + await self.login() + + app = await self.get_application(name) + if app: + self.log.debug("JujuApi: Destroying application {}".format( + name, + )) + + await app.destroy() + + async def remove_relation(self, a, b): + """ + Remove a relation between two application endpoints + + :param a An application endpoint + :param b An application endpoint + """ + if not self.authenticated: + await self.login() + + m = await self.get_model() + try: + m.remove_relation(a, b) + finally: + await m.disconnect() + + async def resolve_error(self, application=None): + """Resolve units in error state.""" + if not self.authenticated: + await self.login() + + app = await self.get_application(application) + if app: + self.log.debug("JujuApi: Resolving errors for application {}".format( + application, + )) + + for unit in app.units: + app.resolved(retry=True) + + async def run_action(self, application, action_name, **params): + """Execute an action and return an Action object.""" + if not self.authenticated: + await self.login() + result = { + 'status': '', + 'action': { + 'tag': None, + 'results': None, + } + } + app = await self.get_application(application) + if app: + # We currently only have one unit per application + # so use the first unit available. + unit = app.units[0] + + self.log.debug("JujuApi: Running Action {} against Application {}".format( + action_name, + application, + )) + + action = await unit.run_action(action_name, **params) + + # Wait for the action to complete + await action.wait() + + result['status'] = action.status + result['action']['tag'] = action.data['id'] + result['action']['results'] = action.results + + return result + execute_action = run_action + + async def set_config(self, application, config): + """Apply a configuration to the application.""" + if not self.authenticated: + await self.login() + + app = await self.get_application(application) + if app: + self.log.debug("JujuApi: Setting config for Application {}".format( + application, + )) + await app.set_config(config) + + # Verify the config is set + newconf = await app.get_config() + for key in config: + if config[key] != newconf[key]: + self.log.debug("JujuApi: Config not set! Key {} Value {} doesn't match {}".format(key, config[key], newconf[key])) + + + async def set_parameter(self, parameter, value, application=None): + """Set a config parameter for a service.""" + if not self.authenticated: + await self.login() + + self.log.debug("JujuApi: Setting {}={} for Application {}".format( + parameter, + value, + application, + )) + return await self.apply_config( + {parameter: value}, + application=application, + ) + + async def wait_for_application(self, name, timeout=300): + """Wait for an application to become active.""" + if not self.authenticated: + await self.login() + + app = await self.get_application(name) + if app: + self.log.debug("JujuApi: Waiting {} seconds for Application {}".format( + timeout, + name, + )) + + await self.model.block_until( + lambda: all( + unit.agent_status == 'idle' + and unit.workload_status + in ['active', 'unknown'] for unit in app.units + ), + timeout=timeout, + ) + + +def get_argparser(): + parser = argparse.ArgumentParser(description='Test Driver for Juju API') + + ################### + # Authentication # + ################### + parser.add_argument( + "-s", "--server", + default='10.0.202.49', + help="Juju controller", + ) + parser.add_argument( + "-u", "--user", + default='admin', + help="User, default user-admin" + ) + parser.add_argument( + "-p", "--password", + default='', + help="Password for the user" + ) + parser.add_argument( + "-P", "--port", + default=17070, + help="Port number, default 17070" + ) + parser.add_argument( + "-m", "--model", + default='default', + help="The model to connect to." + ) + + ########## + # Charm # + ########## + parser.add_argument( + "-d", "--directory", + help="Local directory for the charm" + ) + parser.add_argument( + "--application", + help="Charm name" + ) + + ############# + # Placement # + ############# + + """ + To deploy to a non-Juju machine, provide the host and + credentials for Juju to manually provision (host, username, (password or key?)) + + """ + parser.add_argument( + "--proxy", + action='store_true', + help="Deploy as a proxy charm.", + ) + parser.add_argument( + "--no-proxy", + action='store_false', + dest='proxy', + help="Deploy as a full charm.", + ) + parser.set_defaults(proxy=True) + + # Test options? + # unit test? + ####### + # VNF # + ####### + + return parser.parse_args() + + +async def deploy_charm_and_wait(): + args = get_argparser() + + # Set logging level to debug so we can see verbose output from the + # juju library. + logging.basicConfig(level=logging.DEBUG) + + # Quiet logging from the websocket library. If you want to see + # everything sent over the wire, set this to DEBUG. + ws_logger = logging.getLogger('websockets.protocol') + ws_logger.setLevel(logging.INFO) + + """Here's an example of a coroutine that will deploy a charm and wait until + it's ready to be used.""" + api = JujuApi(server=args.server, + port=args.port, + user=args.user, + secret=args.password, + # loop=loop, + log=ws_logger, + model_name=args.model + ) + print("Logging in...") + await api.login() + + if api.authenticated: + status = await api.get_status() + print('Applications:', list(status.applications.keys())) + print('Machines:', list(status.machines.keys())) + + if args.directory and args.application: + + + + # Deploy the charm + charm = os.path.basename( + os.path.expanduser( + os.path.dirname(args.directory) + ) + ) + await api.deploy_application(charm, + name=args.application, + path=args.directory, + ) + + # Wait for the application to fully deploy. This will block until the + # agent is in an idle state, and the charm's workload is either + # 'active' or 'unknown', meaning it's ready but the author did not + # explicitly set a workload state. + print("Waiting for application '{}' to deploy...".format(charm)) + while (True): + # Deploy the charm and wait, periodically checking its status + await api.wait_for_application(charm, 30) + + error = await api.is_application_error(charm) + if error: + print("This application is in an error state.") + break + + blocked = await api.is_application_blocked(charm) + if blocked: + print("This application is blocked.") + break + + # An extra check to see if the charm is ready + up = await api.is_application_up(charm) + print("Application is {}".format("up" if up else "down")) + + print("Service {} is deployed".format(args.application)) + + ################################### + # Execute config on a proxy charm # + ################################### + config = await api.get_config(args.application) + hostname = config['ssh-username']['value'] + rhostname = hostname[::-1] + + # Apply the configuration + await api.apply_config( + {'ssh-username': rhostname}, application=args.application + ) + + # Get the configuration + config = await api.get_config(args.application) + + # Verify the configuration has been updated + assert(config['ssh-username']['value'] == rhostname) + + #################################### + # Get the status of an application # + #################################### + status = await api.get_application_status(charm) + print("Application Status: {}".format(status)) + + ########################### + # Execute a simple action # + ########################### + result = await api.run_action(charm, 'get-ssh-public-key') + print("Action {} status is {} and returned {}".format( + result['status'], + result['action']['tag'], + result['action']['results'] + )) + + ##################################### + # Execute an action with parameters # + ##################################### + result = await api.run_action(charm, 'run', command='hostname') + + print("Action {} status is {} and returned {}".format( + result['status'], + result['action']['tag'], + result['action']['results'] + )) + + print("Logging out...") + await api.logout() + api = None + +# get public key in juju controller? that can be pulled without need of a charm deployed and installed to vm via cloud-init + +if __name__ == "__main__": + # Create a single event loop for running code asyncronously. + loop = asyncio.get_event_loop() + + # An initial set of tasks to run + tasks = [ + deploy_charm_and_wait(), + ] + + # TODO: optionally run forever and use a Watcher to monitor what's happening + loop.run_until_complete(asyncio.wait(tasks)) diff --git a/lcm/lcm.py b/lcm/lcm.py index d183963f..6553b0a0 100644 --- a/lcm/lcm.py +++ b/lcm/lcm.py @@ -1,6 +1,17 @@ #!/usr/bin/python3 # -*- coding: utf-8 -*- + +import argparse +import glob +from juju_api import JujuApi +import mimetypes +import os +import os.path +import re +import shutil +import tarfile + import asyncio import aiohttp import yaml @@ -18,14 +29,130 @@ logging.basicConfig(format=streamformat, level=logging.DEBUG) logger = logging.getLogger('lcm') ro_account = { - "url": "http://localhost:9090/openmano", - "tenant": "osm" + "url": "http://10.105.129.121:9090/openmano", + "tenant": "osm", } vca_account = { - # TODO + "ip": "10.105.129.40", + "port": 17070, + "user": "admin", + "secret": "NzdjM2M4ODA5NjlhNzRkZGJhMzc2NjNk", } +# These functions are written to use the JujuApi class from juju_api.py, a +# drop-in copy of the one used in OSM today. This will make it easier to extend +# functionality in the LCM as it's added to the Juju API + + +def GetJujuApi(loop): + # Quiet logging from the websocket library. If you want to see + # everything sent over the wire, set this to DEBUG. + logging.basicConfig(level=logging.DEBUG) + + ws_logger = logging.getLogger('websockets.protocol') + ws_logger.setLevel(logging.INFO) + + api = JujuApi(server=vca_account['ip'], + port=vca_account['port'], + user=vca_account['user'], + secret=vca_account['secret'], + loop=loop, + log=ws_logger, + model_name='default' + ) + return api + + +def get_vnf_unique_name(nsr_name, vnfr_name, member_vnf_index): + """Get the unique VNF name. + Charm names accepts only a to z and non-consecutive - characters.""" + name = "{}-{}-{}".format(nsr_name, vnfr_name, member_vnf_index) + new_name = '' + for c in name: + if c.isdigit(): + c = chr(97 + int(c)) + elif not c.isalpha(): + c = "-" + new_name += c + return re.sub('\-+', '-', new_name.lower()) + + +async def DeployApplication(loop, application_name, charm_path, config): + """ + Deploy a charm. + + Deploy a VNF configuration charm from a local directory. + :param object loop: The event loop + :param str application_name: The unique name of this application. + :param str charm_path: The path to the charm. + + :Example: + + DeployApplication(loop, ".cache/ping_vnf/charm/pingpong", "ping_vnf") + """ + + api = GetJujuApi(loop) + + await api.login() + if api.authenticated: + charm = os.path.basename(charm_path) + + await api.deploy_application(charm, + name=application_name, + path=charm_path, + ) + await api.apply_config(config, application_name) + + # Wait for the application to fully deploy. This will block until the + # agent is in an idle state, and the charm's workload is either + # 'active' or 'unknown', meaning it's ready but the author did not + # explicitly set a workload state. + # print("Waiting for application '{}' to deploy...".format(charm)) + while (True): + # Deploy the charm and wait, periodically checking its status + await api.wait_for_application(charm, 30) + + error = await api.is_application_error(charm) + if error: + print("This application is in an error state.") + break + + blocked = await api.is_application_blocked(charm) + if blocked: + print("This application is blocked.") + break + + # An extra check to see if the charm is ready + up = await api.is_application_up(charm) + # print("Application is {}".format("up" if up else "down")) + + print("Application {} is deployed".format(args.application)) + await api.logout() + + +async def RemoveApplication(loop, application_name): + """ + Remove an application from the Juju Controller + + Removed the named application and it's charm from the Juju controller. + + :param object loop: The event loop. + :param str application_name: The unique name of the application. + + :Example: + + RemoveApplication(loop, "ping_vnf") + RemoveApplication(loop, "pong_vnf") + """ + api = GetJujuApi(loop) + + await api.login() + if api.authenticated: + print("Removing application {}".format(application_name)) + await api.remove_application(application_name) + await api.logout() + # conains created tasks/futures to be able to cancel lcm_tasks = {} @@ -150,19 +277,35 @@ async def CreateNS(loop, nsr_id): vnfd = db.get_one("vnfd", {"id": vnfd_id}) if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"): proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"] - config_primitive = vnfd["vnf-configuration"].get("config-primitive") + # config_primitive = vnfd["vnf-configuration"].get("config-primitive") + initial_config_primitive = vnfd["vnf-configuration"].get("initial-config-primitive") # get parameters for juju charm base_folder = vnfd["_admin"]["storage"] path = base_folder + "/charms/" + proxy_charm mgmt_ip = nsr_lcm['nsr_ip'][vnfd_index] -<<<<<<< HEAD - # task = asyncio.ensure_future(DeployCharm(loop, path, mgmt_ip, config_primitive)) - pass - # TODO launch VCA charm -======= + # TODO launch VCA charm # task = asyncio.ensure_future(DeployCharm(loop, path, mgmt_ip, config_primitive)) ->>>>>>> 6ce7cc879dda3b729b10d563bd26df613dc9f70f + config = {} + for primitive in initial_config_primitive: + if primitive['name'] == 'config': + for parameter in primitive['parameter']: + param = parameter['name'] + if parameter['value'] == "": + config[param] = mgmt_ip + else: + config[param] = parameter['value'] + + task = asyncio.ensure_future( + DeployApplication( + loop, + get_vnf_unique_name(nsd_id, vnfd_id, vnfd_index), + path, + config, + ) + ) + + nsr_lcm["status"] = "DONE" db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) @@ -180,15 +323,23 @@ async def DestroyNS(loop, nsr_id): logger.debug("DestroyNS task nsr_id={} Enter".format(nsr_id)) nsr_lcm = db.get_one("nsr_lcm", {"id": nsr_id}) ns_request = db.get_one("ns_request", {"id": nsr_id}) + nsd_id = ns_request["nsd_id"] nsr_lcm["status"] = "DELETING" nsr_lcm["status_detailed"] = "Deleting charms" db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) -<<<<<<< HEAD + # TODO destroy charms -======= - # TODO destroy VCA charm ->>>>>>> 6ce7cc879dda3b729b10d563bd26df613dc9f70f + for c_vnf in nsd["constituent-vnfd"]: + vnfd_id = c_vnf["vnfd-id-ref"] + vnfd_index = int(c_vnf["member-vnf-index"]) + vnfd = db.get_one("vnfd", {"id": vnfd_id}) + if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"): + RemoveApplication( + get_vnf_unique_name( + nsd_id, vnfd_id, vnfd_index + ) + ) # remove from RO RO = ROclient.ROClient(loop, endpoint_url=ro_account["url"], tenant=ro_account["tenant"], @@ -267,7 +418,6 @@ def cancel_tasks(loop, nsr_id): lcm_tasks[nsr_id] = {} - async def read_kafka(loop, bus_info): global lcm_tasks logger.debug("kafka task Enter") @@ -325,9 +475,9 @@ async def read_kafka(loop, bus_info): logger.debug("kafka task Exit") -def lcm(): +def lcm(kafka): loop = asyncio.get_event_loop() - loop.run_until_complete(read_kafka(loop, {"file": "/home/atierno/OSM/osm/NBI/kafka"})) + loop.run_until_complete(read_kafka(loop, {"file": kafka})) return @@ -348,27 +498,101 @@ def lcm2(): loop.close() +def get_argparser(): + parser = argparse.ArgumentParser() + + parser.add_argument( + '--vnfd', + nargs="+", + type=str, + action='append', + required=True, + ) + parser.add_argument( + '--nsd', + type=str, + required=True, + ) + + parser.add_argument( + '--kafka', + type=str, + required=True, + ) + + parser.add_argument( + '--datacenter', + type=str, + required=True, + default="OST2_MRT" + ) + args = parser.parse_args() + + # Quick hack to make this one list + vnfds = [] + for vnfd in args.vnfd: + vnfds += vnfd + args.vnfd = vnfds + + return args + + +# def find_yaml(path): +# """Find the first yaml file, rescursively, in the path.""" +# for filename in glob.iglob('path/**/.yaml'): +# print(filename) +# +# if __name__ == '__main__': - # FOR TEST - RO_VIM = "OST2_MRT" + args = get_argparser() + print(args) - #FILL DATABASE - with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f: - vnfd = yaml.load(f) - vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) - vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"} - db.create("vnfd", vnfd_clean) - with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f: - vnfd = yaml.load(f) - vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) - vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"} - db.create("vnfd", vnfd_clean) - with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f: - nsd = yaml.load(f) - nsd_clean, _ = ROclient.remove_envelop("nsd", nsd) - nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"} - db.create("nsd", nsd_clean) + # FOR TEST + RO_VIM = args.datacenter + + # Unpack the NSD/VNFD packages to a persistent on-disk cache + if os.path.exists('.cache'): + shutil.rmtree('.cache') + os.mkdir('.cache') + + for vnfd in args.vnfd: + if mimetypes.guess_type(vnfd)[0] == "application/x-tar": + with tarfile.open(vnfd) as tar: + tar.extractall('.cache/') + # The path is the root of our charm + vnfd_dir = "{}/.cache/{}".format( + os.path.dirname( + os.path.realpath(__file__) + ), + tar.getnames()[0] + ) + for entity in tar: + if entity.name.endswith('_vnfd.yaml'): + print("VNFD: {}/{}".format(".cache", entity.name)) + with open("{}/{}".format(".cache", entity.name)) as f: + vnfd = yaml.load(f) + vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + vnfd_clean["_admin"] = {"storage": vnfd_dir} + db.create("vnfd", vnfd_clean) + + if mimetypes.guess_type(args.nsd)[0] == "application/x-tar": + with tarfile.open(args.nsd) as tar: + tar.extractall('.cache/') + + nsd_dir = "{}/.cache/{}".format( + os.path.dirname( + os.path.realpath(__file__) + ), + tar.getnames()[0] + ) + for entity in tar: + if entity.name.endswith('_nsd.yaml'): + with open("{}/{}".format(".cache", entity.name)) as f: + nsd = yaml.load(f) + nsd_clean, _ = ROclient.remove_envelop("nsd", nsd) + nsd_clean["_admin"] = {"storage": nsd_dir} + db.create("nsd", nsd_clean) ns_request = { "id": "ns1", @@ -387,4 +611,42 @@ if __name__ == '__main__': } db.create("ns_request", ns_request) # lcm2() - lcm() + lcm(args.kafka) + + pass + + #FILL DATABASE + # with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f: + # vnfd = yaml.load(f) + # vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + # vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"} + # db.create("vnfd", vnfd_clean) + # with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f: + # vnfd = yaml.load(f) + # vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + # vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"} + # db.create("vnfd", vnfd_clean) + # with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f: + # nsd = yaml.load(f) + # nsd_clean, _ = ROclient.remove_envelop("nsd", nsd) + # nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"} + # db.create("nsd", nsd_clean) + # + # ns_request = { + # "id": "ns1", + # "nsr_id": "ns1", + # "name": "pingpongOne", + # "vim": RO_VIM, + # "nsd_id": nsd_clean["id"], # nsd_ping_pong + # } + # db.create("ns_request", ns_request) + # ns_request = { + # "id": "ns2", + # "nsr_id": "ns2", + # "name": "pingpongTwo", + # "vim": RO_VIM, + # "nsd_id": nsd_clean["id"], # nsd_ping_pong + # } + # db.create("ns_request", ns_request) + # # lcm2() + # lcm()