blob: 5e5d0de1f36cebdf92e41368111c88fdf22ea275 [file] [log] [blame]
"""
Copyright (c) 2015 SONATA-NFV and Paderborn University
ALL RIGHTS RESERVED.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
nor the names of its contributors may be used to endorse or promote
products derived from this software without specific prior written
permission.
This work has been performed in the framework of the SONATA project,
funded by the European Commission under Grant number 671517 through
the Horizon 2020 and 5G-PPP programmes. The authors would like to
acknowledge the contributions of their colleagues of the SONATA
partner consortium (www.sonata-nfv.eu).
"""
"""
Distributed Cloud Emulator (dcemulator)
(c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
"""
import logging
import threading
import zerorpc
import paramiko
import ipaddress
import time
import gevent
logging.basicConfig(level=logging.INFO)
class ZeroRpcApiEndpoint(object):
"""
Simple API endpoint that offers a zerorpc-based
interface. This interface will be used by the
default command line client.
It can be used as a reference to implement
REST interfaces providing the same semantics,
like e.g. OpenStack compute API.
"""
def __init__(self, listenip, port):
self.dcs = {}
self.ip = listenip
self.port = port
logging.debug("Created API endpoint %s(%s:%d)" % (
self.__class__.__name__, self.ip, self.port))
def connectDatacenter(self, dc):
self.dcs[dc.label] = dc
logging.info("Connected DC(%s) to API endpoint %s(%s:%d)" % (
dc.label, self.__class__.__name__, self.ip, self.port))
def start(self):
thread = threading.Thread(target=self._api_server_thread, args=())
thread.daemon = True
thread.start()
logging.debug("Started API endpoint %s(%s:%d)" % (
self.__class__.__name__, self.ip, self.port))
def _api_server_thread(self):
s = zerorpc.Server(MultiDatacenterApi(self.dcs))
s.bind("tcp://%s:%d" % (self.ip, self.port))
s.run()
class MultiDatacenterApi(object):
"""
Just pass through the corresponding request to the
selected data center. Do not implement provisioning
logic here because will will have multiple API
endpoint implementations at the end.
"""
def __init__(self, dcs):
self.dcs = dcs
def compute_action_start(self, dc_label, compute_name, image, network, command):
"""
Start a new compute instance: A docker container (note: zerorpc does not support keyword arguments)
:param dc_label: name of the DC
:param compute_name: compute container name
:param image: image name
:param command: command to execute
:param network: list of all interface of the vnf, with their parameters (id=id1,ip=x.x.x.x/x),...
:return: networks list({"id":"input","ip": "10.0.0.254/8"}, {"id":"output","ip": "11.0.0.254/24"})
"""
# TODO what to return UUID / given name / internal name ?
logging.debug("RPC CALL: compute start")
try:
c = self.dcs.get(dc_label).startCompute(
compute_name, image=image, command=command, network=network)
#return str(c.name)
# return docker inspect dict
return c.getStatus()
except Exception as ex:
logging.exception("RPC error.")
return ex.message
def compute_action_stop(self, dc_label, compute_name):
logging.debug("RPC CALL: compute stop")
try:
return self.dcs.get(dc_label).stopCompute(compute_name)
except Exception as ex:
logging.exception("RPC error.")
return ex.message
def compute_list(self, dc_label):
logging.debug("RPC CALL: compute list")
try:
if dc_label is None:
# return list with all compute nodes in all DCs
all_containers = []
for dc in self.dcs.itervalues():
all_containers += dc.listCompute()
return [(c.name, c.getStatus())
for c in all_containers]
else:
# return list of compute nodes for specified DC
return [(c.name, c.getStatus())
for c in self.dcs.get(dc_label).listCompute()]
except Exception as ex:
logging.exception("RPC error.")
return ex.message
def compute_status(self, dc_label, compute_name):
logging.debug("RPC CALL: compute status")
try:
return self.dcs.get(
dc_label).containers.get(compute_name).getStatus()
except Exception as ex:
logging.exception("RPC error.")
return ex.message
@zerorpc.stream
def compute_profile(self, dc_label, compute_name, kwargs):
# note: zerorpc does not support keyword arguments
## VIM/dummy gatekeeper's tasks:
# start vnf
vnf_status = self.compute_action_start( dc_label, compute_name,
kwargs.get('image'),
kwargs.get('network'),
kwargs.get('command'))
# start traffic source (with fixed ip addres, no use for now...)
psrc_status = self.compute_action_start( dc_label, 'psrc', 'profile_source', [{'id':'output'}], None)
# start traffic sink (with fixed ip addres)
psink_status = self.compute_action_start(dc_label, 'psink', 'profile_sink', [{'id': 'input'}], None)
# link vnf to traffic source
DCNetwork = self.dcs.get(dc_label).net
DCNetwork.setChain('psrc', compute_name,
vnf_src_interface='output',
vnf_dst_interface=kwargs.get('input'),
cmd='add-flow', weight=None, bidirectional=True)
DCNetwork.setChain('psrc', compute_name,
vnf_src_interface='output',
vnf_dst_interface=kwargs.get('input'),
cmd='add-flow', weight=None,
match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
cookie=10)
DCNetwork.setChain( compute_name, 'psink',
vnf_src_interface='output',
vnf_dst_interface=kwargs.get('input'),
cmd='add-flow', weight=None, bidirectional=True)
DCNetwork.setChain(compute_name, 'psink',
vnf_src_interface='output',
vnf_dst_interface=kwargs.get('input'),
cmd='add-flow', weight=None,
match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
cookie=11)
## SSM/SP tasks:
# start traffic generation
'''
for nw in psrc_status.get('network'):
if nw.get('intf_name') == 'output':
psrc_output_ip = unicode(nw['ip'])
break
dummy_iperf_server_ip = ipaddress.IPv4Address(psrc_output_ip) + 1
'''
for nw in psink_status.get('network'):
if nw.get('intf_name') == 'input':
psink_input_ip = nw['ip']
break
# get monitor data and analyze
vnf_uuid = vnf_status['id']
psrc_mgmt_ip = psrc_status['docker_network']
# query rate
#need to wait a bit before containers are fully up?
time.sleep(2)
def generate():
for rate in [0, 1, 2, 3]:
#logging.info('query:{0}'.format(query_cpu))
output_line = DCNetwork.monitor_agent.profile(psrc_mgmt_ip, rate, psink_input_ip, vnf_uuid)
gevent.sleep(0)
yield output_line
# query loss
# create table
## VIM/dummy gatekeeper's tasks:
# remove vnfs and chain
DCNetwork.setChain('psrc', compute_name,
vnf_src_interface='output',
vnf_dst_interface=kwargs.get('input'),
cmd='del-flows', weight=None, bidirectional=True)
DCNetwork.setChain('psrc', compute_name,
vnf_src_interface='output',
vnf_dst_interface=kwargs.get('input'),
cmd='del-flows', weight=None,
match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
cookie=10)
DCNetwork.setChain(compute_name, 'psink',
vnf_src_interface='output',
vnf_dst_interface=kwargs.get('input'),
cmd='del-flows', weight=None, bidirectional=True)
DCNetwork.setChain(compute_name, 'psink',
vnf_src_interface='output',
vnf_dst_interface=kwargs.get('input'),
cmd='del-flows', weight=None,
match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
cookie=11)
self.compute_action_stop(dc_label, compute_name)
self.compute_action_stop(dc_label, 'psink')
self.compute_action_stop(dc_label, 'psrc')
return generate()
def datacenter_list(self):
logging.debug("RPC CALL: datacenter list")
try:
return [d.getStatus() for d in self.dcs.itervalues()]
except Exception as ex:
logging.exception("RPC error.")
return ex.message
def datacenter_status(self, dc_label):
logging.debug("RPC CALL: datacenter status")
try:
return self.dcs.get(dc_label).getStatus()
except Exception as ex:
logging.exception("RPC error.")
return ex.message