1. The chain_api was never terminated. This was fixed by turning the
server into an pyWSGI instance.
2. no monkey patching was applied. The monkey patching of gevent is
required in order to be able to handle other events during a
time.sleep() call. Since multiple patching is detected, it was added to
all files which create WSGI servers.
All in all this change fixes a large leak of threads, open files and
performance.
This change updates Containernet in order to fix race conditions which
otherwise happen due to gevent's monkey patching.
Change-Id: Ia45ad07db1f85046bfcac85eaca20c930b931141
Signed-off-by: schillinge <ablu@mail.uni-paderborn.de>
# install containernet (using its Ansible playbook)
# Attention: Containernet installation fixed to specific commit. Change to update to latest Containernet version.
RUN git clone https://github.com/containernet/containernet.git && \
# install containernet (using its Ansible playbook)
# Attention: Containernet installation fixed to specific commit. Change to update to latest Containernet version.
RUN git clone https://github.com/containernet/containernet.git && \
- (cd containernet && git checkout bc269d6f1cf9f50f71fda65c25fe1f2f4c1573b7)
+ (cd containernet && git checkout 6fcee82e192c8c0e6447650d6f512842185529ee)
WORKDIR /containernet/ansible
RUN ansible-playbook -i "localhost," -c local --skip-tags "notindocker" install.yml
WORKDIR /containernet/ansible
RUN ansible-playbook -i "localhost," -c local --skip-tags "notindocker" install.yml
import logging
import copy
import logging
import copy
+from gevent import monkey
+from gevent.pywsgi import WSGIServer
+
from mininet.node import OVSSwitch
from flask import Flask
from flask import Response, request
from flask_restful import Api, Resource
from mininet.node import OVSSwitch
from flask import Flask
from flask import Response, request
from flask_restful import Api, Resource
class ChainApi(Resource):
"""
class ChainApi(Resource):
"""
resource_class_kwargs={'api': self})
self.api.add_resource(QueryTopology, "/v1/topo",
resource_class_kwargs={'api': self})
resource_class_kwargs={'api': self})
self.api.add_resource(QueryTopology, "/v1/topo",
resource_class_kwargs={'api': self})
- self.api.add_resource(Shutdown, "/shutdown")
@self.app.after_request
def add_access_control_header(response):
@self.app.after_request
def add_access_control_header(response):
def _start_flask(self):
logging.info("Starting %s endpoint @ http://%s:%d" %
("ChainDummyApi", self.ip, self.port))
def _start_flask(self):
logging.info("Starting %s endpoint @ http://%s:%d" %
("ChainDummyApi", self.ip, self.port))
- if self.app is not None:
- self.app.before_request(self.dump_playbook)
- self.app.run(self.ip, self.port, debug=True, use_reloader=False)
+ self.http_server = WSGIServer(
+ (self.ip, self.port),
+ self.app,
+ log=open("/dev/null", "w") # don't show http logs
+ )
+ self.http_server.serve_forever(stop_timeout=1)
+ logging.info('Stopped %s' % self.__class__.__name__)
+
+ def stop(self):
+ if self.http_server:
+ logging.info('Stopping %s' % self.__class__.__name__)
+ self.http_server.stop(timeout=1)
def dump_playbook(self):
with self.manage.lock:
def dump_playbook(self):
with self.manage.lock:
logfile.write(data + "\n")
logfile.write(data + "\n")
-class Shutdown(Resource):
- def get(self):
- logging.debug(("%s is beeing shut down") % (__name__))
- func = request.environ.get('werkzeug.server.shutdown')
- if func is None:
- raise RuntimeError('Not running with the Werkzeug Server')
- func()
-
-
class ChainVersionsList(Resource):
'''
Entrypoint to find versions of the chain api.
class ChainVersionsList(Resource):
'''
Entrypoint to find versions of the chain api.
# dependent!
self.chain = chain_api.ChainApi(ip, port, self)
self.thread = threading.Thread(target=self.chain._start_flask, args=())
# dependent!
self.chain = chain_api.ChainApi(ip, port, self)
self.thread = threading.Thread(target=self.chain._start_flask, args=())
- self.thread.daemon = True
self.thread.name = self.chain.__class__
self.thread.start()
self.thread.name = self.chain.__class__
self.thread.start()
self.floating_intf = None
self.floating_links = dict()
self.floating_intf = None
self.floating_links = dict()
+ def stop(self):
+ self.chain.stop()
+ self.thread.join()
+
@property
def net(self):
return self._net
@property
def net(self):
return self._net
from openstack_dummies.nova_dummy_api import NovaDummyApi
import logging
from openstack_dummies.nova_dummy_api import NovaDummyApi
import logging
import compute
import socket
import time
import compute
import socket
import time
for c in self.openstack_endpoints.values():
c.compute = self.compute
c.manage = self.manage
for c in self.openstack_endpoints.values():
c.compute = self.compute
c.manage = self.manage
- c.server_thread = threading.Thread(target=c._start_flask, args=())
- c.server_thread.daemon = True
- c.server_thread.name = c.__class__.__name__
- c.server_thread.start()
if wait_for_port:
self._wait_for_port(c.ip, c.port)
if wait_for_port:
self._wait_for_port(c.ip, c.port)
"""
for c in self.openstack_endpoints.values():
c.stop()
"""
for c in self.openstack_endpoints.values():
c.stop()
- # for c in self.openstack_endpoints.values():
- # if c.server_thread:
- # print("Waiting for WSGIServers to be stopped ...")
- # c.server_thread.join()
+ for c in self.openstack_endpoints.values():
+ if c.server_thread:
+ c.server_thread.join()
+ self.manage.stop()
def _wait_for_port(self, ip, port):
for i in range(0, 10):
def _wait_for_port(self, ip, port):
for i in range(0, 10):
# the Horizon 2020 and 5G-PPP programmes. The authors would like to
# acknowledge the contributions of their colleagues of the SONATA
# partner consortium (www.sonata-nfv.eu).
# the Horizon 2020 and 5G-PPP programmes. The authors would like to
# acknowledge the contributions of their colleagues of the SONATA
# partner consortium (www.sonata-nfv.eu).
+import logging
+
+import threading
from flask import Flask, request
from flask_restful import Api, Resource
from flask import Flask, request
from flask_restful import Api, Resource
+from gevent import monkey
from gevent.pywsgi import WSGIServer
from gevent.pywsgi import WSGIServer
LOG = logging.getLogger("api.openstack.base")
LOG = logging.getLogger("api.openstack.base")
self.app = Flask(__name__)
self.api = Api(self.app)
self.app = Flask(__name__)
self.api = Api(self.app)
+ def start(self):
+ self.server_thread = threading.Thread(target=self._start_flask, args=())
+ self.server_thread.name = self.__class__.__name__
+ self.server_thread.start()
+
def stop(self):
if self.http_server:
def stop(self):
if self.http_server:
- self.http_server.stop(timeout=1.0)
+ LOG.info('Stopping %s' % self.__class__.__name__)
+ self.http_server.stop(timeout=1)
def _start_flask(self):
LOG.info("Starting %s endpoint @ http://%s:%d" % (
def _start_flask(self):
LOG.info("Starting %s endpoint @ http://%s:%d" % (
self.app,
log=open("/dev/null", "w") # don't show http logs
)
self.app,
log=open("/dev/null", "w") # don't show http logs
)
- self.http_server.serve_forever(stop_timeout=1.0)
+ self.http_server.serve_forever(stop_timeout=1)
+ LOG.info('Stopped %s' % self.__class__.__name__)
def dump_playbook(self):
with self.manage.lock:
def dump_playbook(self):
with self.manage.lock:
import threading
from flask import Flask
from flask_restful import Api
import threading
from flask import Flask
from flask_restful import Api
+from gevent import monkey
from gevent.pywsgi import WSGIServer
# need to import total module to set its global variable dcs
from gevent.pywsgi import WSGIServer
# need to import total module to set its global variable dcs
import pkg_resources
from os import path
import pkg_resources
from os import path
# the Horizon 2020 and 5G-PPP programmes. The authors would like to
# acknowledge the contributions of their colleagues of the SONATA
# partner consortium (www.sonata-nfv.eu).
# the Horizon 2020 and 5G-PPP programmes. The authors would like to
# acknowledge the contributions of their colleagues of the SONATA
# partner consortium (www.sonata-nfv.eu).
+
+from gevent import monkey
+monkey.patch_all() # noqa: because otherwise pep complains about code before imports
+
import unittest
import os
import subprocess
import unittest
import os
import subprocess