if len(network) < 1:
network.append({})
+ """
# allocate in resource resource model and compute resource limits for new container
cpu_limit = mem_limit = disk_limit = -1
cpu_period = cpu_quota = None
if mem_limit < 4:
mem_limit = 4
LOG.warning("Increased MEM limit for %r because it was less than 4.0 MB." % name)
+ """
# create the container
d = self.net.addDocker(
"%s" % (name),
dimage=image,
dcmd=command,
datacenter=self,
- flavor_name=flavor_name,
- cpu_period=int(cpu_period) if cpu_limit > 0 else None, # set cpu limits if needed
- cpu_quota=int(cpu_quota) if cpu_limit > 0 else None,
- #mem_limit="%dm" % int(mem_limit) if mem_limit > 0 else None, # set mem limits if needed
- #memswap_limit="%dm" % int(mem_limit) if mem_limit > 0 else None # lets set swap to mem limit for now
+ flavor_name=flavor_name
)
+
+ # apply resource limits to container if a resource model is defined
+ if self._resource_model is not None:
+ self._resource_model.allocate(d)
+
# connect all given networks
# if no --net option is given, network = [{}], so 1 empty dict in the list
# this results in 1 default interface with a default ip address
# do bookkeeping
self.containers[name] = d
+ # TODO re-enable logging
+ """
# write resource log if a path is given
if self.resource_log_path is not None:
l = dict()
# append to logfile
with open(self.resource_log_path, "a") as f:
f.write("%s\n" % json.dumps(l))
+ """
return d # we might use UUIDs for naming later on
def stopCompute(self, name):
if name not in self.containers:
raise Exception("Container with name %s not found." % name)
LOG.debug("Stopping compute instance %r in data center %r" % (name, str(self)))
+
+ # call resource model and free resources
+ if self._resource_model is not None:
+ self._resource_model.free(self.containers[name])
+
+ # remove links
self.net.removeLink(
link=None, node1=self.containers[name], node2=self.switch)
+
+ # remove container
self.net.removeDocker("%s" % (name))
del self.containers[name]
- # call resource model and free resources
- if self._resource_model is not None:
- self._resource_model.free(name)
+ # TODO re-enable logging
+ """
# write resource log if a path is given
if self.resource_log_path is not None:
l = dict()
# append to logfile
with open(self.resource_log_path, "a") as f:
f.write("%s\n" % json.dumps(l))
+ """
return True
def listCompute(self):
self._initDefaultFlavors()
self.registrar = None # pointer to registrar
self.dcs = list()
- self.allocated_compute_instances = dict()
+ self._allocated_compute_instances = dict()
LOG.info("Resource model %r initialized" % self)
def __repr__(self):
raise Exception("Flavor with name %r already exists!" % fl.name)
self._flavors[fl.name] = fl
- def allocate(self, name, flavor_name):
+ def allocate(self, d):
"""
This method has to be overwritten by a real resource model.
- :param name: Name of the started compute instance.
- :param flavor_name: Name of the flavor to be allocated.
- :return: 3-tuple: (CPU-fraction, Mem-limit, Disk-limit)
+ :param d: Container object
"""
- LOG.warning("Allocating in BaseResourceModel: %r with flavor: %r" % (name, flavor_name))
- self.allocated_compute_instances[name] = flavor_name
- return -1.0, -1.0, -1.0 # return invalid values to indicate that this RM is a dummy
+ LOG.warning("Allocating in BaseResourceModel: %r with flavor: %r" % (d.name, d.flavor_name))
+ self._allocated_compute_instances[d.name] = d.flavor_name
- def free(self, name):
+ def free(self, d):
"""
This method has to be overwritten by a real resource model.
- :param name: Name of the compute instance that is stopped.
- :return: True/False
+ :param d: Container object
"""
- LOG.warning("Free in BaseResourceModel: %r" % name)
- del self.allocated_compute_instances[name]
- return True
+ LOG.warning("Free in BaseResourceModel: %r" % d.name)
+ del self._allocated_compute_instances[d.name]
def get_state_dict(self):
"""
lifetime.
"""
- def __init__(self, max_cu=32, max_mu=1024):
+ def __init__(self, max_cu=32, max_mu=1024,
+ deactivate_cpu_limit=False,
+ deactivate_mem_limit=False):
"""
Initialize model.
:param max_cu: Maximum number of compute units available in this DC.
self.dc_max_mu = max_mu
self.dc_alloc_cu = 0
self.dc_alloc_mu = 0
- self.cu = 0
- self.mu = 0
+ self.deactivate_cpu_limit = deactivate_cpu_limit
+ self.deactivate_mem_limit = deactivate_mem_limit
super(self.__class__, self).__init__()
- def allocate(self, name, flavor_name):
+ def allocate(self, d):
"""
- Calculate resources for container with given flavor.
- :param name: Container name.
- :param flavor_name: Flavor name.
+ Allocate resources for the given container.
+ Defined by d.flavor_name
+ :param d: container
:return:
"""
- # bookkeeping and flavor handling
- if flavor_name not in self._flavors:
- raise Exception("Flavor %r does not exist" % flavor_name)
- fl = self._flavors.get(flavor_name)
- self.allocated_compute_instances[name] = flavor_name
- # calc and return
- return self._allocate_cpu(fl), self._allocate_mem(fl), -1.0 # return 3tuple (cpu, memory, disk)
+ self._allocated_compute_instances[d.name] = d
+ if not self.deactivate_cpu_limit:
+ self._allocate_cpu(d)
+ if not self.deactivate_mem_limit:
+ self._allocate_mem(d)
+ self._apply_limits()
- def free(self, name):
+ def _allocate_cpu(self, d):
"""
- Free resources of given container.
- :param name: Container name.
+ Actually allocate (bookkeeping)
+ :param d: container
:return:
"""
- if name not in self.allocated_compute_instances:
- return False
- # bookkeeping
- self._free_cpu(self._flavors.get(self.allocated_compute_instances[name]))
- self._free_mem(self._flavors.get(self.allocated_compute_instances[name]))
- del self.allocated_compute_instances[name]
- # we don't have to calculate anything special here in this simple model
- return True
+ fl_cu = self._get_flavor(d).get("compute")
+ # check for over provisioning
+ if self.dc_alloc_cu + fl_cu > self.dc_max_cu:
+ raise Exception("Not enough compute resources left.")
+ self.dc_alloc_cu += fl_cu
- def get_state_dict(self):
+ def _allocate_mem(self, d):
"""
- Return the state of the resource model as simple dict.
- Helper method for logging functionality.
+ Actually allocate (bookkeeping)
+ :param d: container
:return:
"""
- r = dict()
- r["e_cpu"] = self.registrar.e_cpu
- r["e_mem"] = self.registrar.e_mem
- r["dc_max_cu"] = self.dc_max_cu
- r["dc_max_mu"] = self.dc_max_mu
- r["dc_alloc_cu"] = self.dc_alloc_cu
- r["dc_alloc_mu"] = self.dc_alloc_mu
- r["cu_cpu_percentage"] = self.cu
- r["mu_mem_percentage"] = self.mu
- r["allocated_compute_instances"] = self.allocated_compute_instances
- return r
+ fl_mu = self._get_flavor(d).get("memory")
+ # check for over provisioning
+ if self.dc_alloc_mu + fl_mu > self.dc_max_mu:
+ raise Exception("Not enough memory resources left.")
+ self.dc_alloc_mu += fl_mu
- def _allocate_cpu(self, flavor):
+ def free(self, d):
"""
- Allocate CPU time.
- :param flavor: flavor dict
- :return: cpu time fraction
+ Free resources allocated to the given container.
+ :param d: container
+ :return:
"""
- fl_cu = flavor.get("compute")
- # check for over provisioning
- if self.dc_alloc_cu + fl_cu > self.dc_max_cu:
- raise Exception("Not enough compute resources left.")
- self.dc_alloc_cu += fl_cu
+ del self._allocated_compute_instances[d.name]
+ if not self.deactivate_cpu_limit:
+ self._free_cpu(d)
+ if not self.deactivate_mem_limit:
+ self._free_mem(d)
+ self._apply_limits()
+
+ def _free_cpu(self, d):
+ """
+ Free resources.
+ :param d: container
+ :return:
+ """
+ self.dc_alloc_cu -= self._get_flavor(d).get("compute")
+
+ def _free_mem(self, d):
+ """
+ Free resources.
+ :param d: container
+ :return:
+ """
+ self.dc_alloc_mu -= self._get_flavor(d).get("memory")
+
+ def _apply_limits(self):
+ """
+ Recalculate real resource limits for all allocated containers and apply them
+ to their cgroups.
+ We have to recalculate for all to allow e.g. overprovisioning models.
+ :return:
+ """
+ for d in self._allocated_compute_instances.itervalues():
+ if not self.deactivate_cpu_limit:
+ self._apply_cpu_limits(d)
+ if not self.deactivate_mem_limit:
+ self._apply_mem_limits(d)
+
+ def _apply_cpu_limits(self, d):
+ """
+ Calculate real CPU limit (CFS bandwidth) and apply.
+ :param d: container
+ :return:
+ """
+ number_cu = self._get_flavor(d).get("compute")
# get cpu time fraction for entire emulation
e_cpu = self.registrar.e_cpu
# calculate cpu time fraction of a single compute unit
- self.cu = float(e_cpu) / sum([rm.dc_max_cu for rm in list(self.registrar.resource_models)])
+ single_cu = float(e_cpu) / sum([rm.dc_max_cu for rm in list(self.registrar.resource_models)])
# calculate cpu time fraction for container with given flavor
- return self.cu * fl_cu
+ cpu_time_percentage = single_cu * number_cu
+ # calculate cpu period and quota for CFS
+ # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt)
+ # Attention minimum cpu_quota is 1ms (micro)
+ cpu_period = 1000000 # lets consider a fixed period of 1000000 microseconds for now
+ cpu_quota = cpu_period * cpu_time_percentage # calculate the fraction of cpu time for this container
+ # ATTENTION >= 1000 to avoid a invalid argument system error ... no idea why
+ if cpu_quota < 1000:
+ cpu_quota = 1000
+ LOG.warning("Increased CPU quota for %r to avoid system error." % d.name)
+ # apply to container if changed
+ if d.cpu_period != cpu_period or d.cpu_quota != cpu_quota:
+ LOG.debug("Setting CPU limit for %r: cpu_quota = cpu_period * limit = %f * %f = %f" % (
+ d.name, cpu_period, cpu_time_percentage, cpu_quota))
+ d.updateCpuLimit(cpu_period=int(cpu_period), cpu_quota=int(cpu_quota))
- def _free_cpu(self, flavor):
+ def _apply_mem_limits(self, d):
"""
- Free CPU allocation.
- :param flavor: flavor dict
+ Calculate real mem limit and apply.
+ :param d: container
:return:
"""
- self.dc_alloc_cu -= flavor.get("compute")
+ number_mu = self._get_flavor(d).get("memory")
+ # get memory amount for entire emulation
+ e_mem = self.registrar.e_mem
+ # calculate amount of memory for a single mu
+ single_mu = float(e_mem) / sum([rm.dc_max_mu for rm in list(self.registrar.resource_models)])
+ # calculate mem for given flavor
+ mem_limit = single_mu * number_mu
+ # ATTENTION minimum mem_limit per container is 4MB
+ if mem_limit < 4:
+ mem_limit = 4
+ LOG.warning("Increased MEM limit for %r because it was less than 4.0 MB." % name)
+ # to byte!
+ mem_limit = int(mem_limit*1024*1024)
+ # apply to container if changed
+ if d.mem_limit != mem_limit:
+ LOG.debug("Setting MEM limit for %r: mem_limit = %f MB" % (d.name, mem_limit/1024/1024))
+ d.updateMemoryLimit(mem_limit=mem_limit)
- def _allocate_mem(self, flavor):
+ def get_state_dict(self):
"""
- Allocate mem.
- :param flavor: flavor dict
- :return: mem limit in MB
+ Return the state of the resource model as simple dict.
+ Helper method for logging functionality.
+ :return:
"""
- fl_mu = flavor.get("memory")
- # check for over provisioning
- if self.dc_alloc_mu + fl_mu > self.dc_max_mu:
- raise Exception("Not enough memory resources left.")
- self.dc_alloc_mu += fl_mu
- # get cpu time fraction for entire emulation
- e_mem = self.registrar.e_mem
- # calculate cpu time fraction of a single compute unit
- self.mu = float(e_mem) / sum([rm.dc_max_mu for rm in list(self.registrar.resource_models)])
- # calculate cpu time fraction for container with given flavor
- return self.mu * fl_mu
+ # TODO update
+ r = dict()
+ r["e_cpu"] = self.registrar.e_cpu
+ r["e_mem"] = self.registrar.e_mem
+ r["dc_max_cu"] = self.dc_max_cu
+ r["dc_max_mu"] = self.dc_max_mu
+ r["dc_alloc_cu"] = self.dc_alloc_cu
+ r["dc_alloc_mu"] = self.dc_alloc_mu
+ r["cu_cpu_percentage"] = -1
+ r["mu_mem_percentage"] = -1
+ r["allocated_compute_instances"] = None #self._allocated_compute_instances
+ return r
- def _free_mem(self, flavor):
+ def _get_flavor(self, d):
"""
- Free memory allocation
- :param flavor: flavor dict
+ Get flavor assigned to given container.
+ Identified by d.flavor_name.
+ :param d: container
:return:
"""
- self.dc_alloc_mu -= flavor.get("memory")
+ if d.flavor_name not in self._flavors:
+ raise Exception("Flavor %r does not exist" % d.flavor_name)
+ return self._flavors.get(d.flavor_name)
# run API endpoint server (in another thread, don't block)
zapi1.start()
- # add the SONATA dummy gatekeeper to each DC
- sdkg1 = SonataDummyGatekeeperEndpoint("0.0.0.0", 8000)
- sdkg1.connectDatacenter(dc1)
- sdkg1.connectDatacenter(dc2)
- # run the dummy gatekeeper (in another thread, don't block)
- sdkg1.start()
-
# start the emulation platform
net.start()
print "Wait a moment and allocate some compute start some compute resources..."
r.addFlavour(f)
self.assertTrue("test" in r._flavors)
self.assertTrue(r._flavors.get("test").get("testmetric") == 42)
- # test if allocate and free runs through
- self.assertTrue(len(r.allocate("testc", "tiny")) == 3) # expected: 3tuple
- self.assertTrue(r.free("testc"))
def testAddRmToDc(self):
"""
self.assertTrue(len(self.net.rm_registrar.resource_models) == 1)
# check if alloc was called during startCompute
- self.assertTrue(len(r.allocated_compute_instances) == 0)
+ self.assertTrue(len(r._allocated_compute_instances) == 0)
self.dc[0].startCompute("tc1")
time.sleep(1)
- self.assertTrue(len(r.allocated_compute_instances) == 1)
+ self.assertTrue(len(r._allocated_compute_instances) == 1)
# check if free was called during stopCompute
self.dc[0].stopCompute("tc1")
- self.assertTrue(len(r.allocated_compute_instances) == 0)
+ self.assertTrue(len(r._allocated_compute_instances) == 0)
# check connectivity by using ping
self.assertTrue(self.net.ping([self.h[0], self.h[1]]) <= 0.0)
# stop Mininet network
self.stopNet()
+def createDummyContainerObject(name, flavor):
+
+ class DummyContainer(object):
+
+ def __init__(self):
+ self.cpu_period = -1
+ self.cpu_quota = -1
+ self.mem_limit = -1
+ self.memswap_limit = -1
+
+ def updateCpuLimit(self, cpu_period, cpu_quota):
+ self.cpu_period = cpu_period
+ self.cpu_quota = cpu_quota
+
+ def updateMemoryLimit(self, mem_limit):
+ self.mem_limit = mem_limit
+
+ d = DummyContainer()
+ d.name = name
+ d.flavor_name = flavor
+ return d
+
+
+
+
class testUpbSimpleCloudDcRM(SimpleTestTopology):
"""
Test the UpbSimpleCloudDc resource model.
rm = UpbSimpleCloudDcRM(max_cu=MAX_CU, max_mu=MAX_MU)
reg.register("test_dc", rm)
- res = rm.allocate("c1", "tiny") # calculate allocation
- self.assertEqual(res[0], E_CPU / MAX_CU * 0.5) # validate compute result
- self.assertEqual(res[1], float(E_MEM) / MAX_MU * 32) # validate memory result
- self.assertTrue(res[2] < 0) # validate disk result
+ c1 = createDummyContainerObject("c1", flavor="tiny")
+ rm.allocate(c1) # calculate allocation
+ self.assertEqual(float(c1.cpu_quota) / c1.cpu_period, E_CPU / MAX_CU * 0.5) # validate compute result
+ self.assertEqual(float(c1.mem_limit/1024/1024), float(E_MEM) / MAX_MU * 32) # validate memory result
+
+ c2 = createDummyContainerObject("c2", flavor="small")
+ rm.allocate(c2) # calculate allocation
+ self.assertEqual(float(c2.cpu_quota) / c2.cpu_period, E_CPU / MAX_CU * 1) # validate compute result
+ self.assertEqual(float(c2.mem_limit/1024/1024), float(E_MEM) / MAX_MU * 128) # validate memory result
+
+
+ c3 = createDummyContainerObject("c3", flavor="medium")
+ res = rm.allocate(c3) # calculate allocation
+ self.assertEqual(float(c3.cpu_quota) / c3.cpu_period, E_CPU / MAX_CU * 4) # validate compute result
+ self.assertEqual(float(c3.mem_limit/1024/1024), float(E_MEM) / MAX_MU * 256) # validate memory result
+
- res = rm.allocate("c2", "small") # calculate allocation
- self.assertEqual(res[0], E_CPU / MAX_CU * 1) # validate compute result
- self.assertEqual(res[1], float(E_MEM) / MAX_MU * 128) # validate memory result
- self.assertTrue(res[2] < 0) # validate disk result
+ c4 = createDummyContainerObject("c4", flavor="large")
+ rm.allocate(c4) # calculate allocation
+ self.assertEqual(float(c4.cpu_quota) / c4.cpu_period, E_CPU / MAX_CU * 8) # validate compute result
+ self.assertEqual(float(c4.mem_limit/1024/1024), float(E_MEM) / MAX_MU * 512) # validate memory result
- res = rm.allocate("c3", "medium") # calculate allocation
- self.assertEqual(res[0], E_CPU / MAX_CU * 4) # validate compute result
- self.assertEqual(res[1], float(E_MEM) / MAX_MU * 256) # validate memory result
- self.assertTrue(res[2] < 0) # validate disk result
- res = rm.allocate("c4", "large") # calculate allocation
- self.assertEqual(res[0], E_CPU / MAX_CU * 8) # validate compute result
- self.assertEqual(res[1], float(E_MEM) / MAX_MU * 512) # validate memory result
- self.assertTrue(res[2] < 0) # validate disk result
+ c5 = createDummyContainerObject("c5", flavor="xlarge")
+ rm.allocate(c5) # calculate allocation
+ self.assertEqual(float(c5.cpu_quota) / c5.cpu_period, E_CPU / MAX_CU * 16) # validate compute result
+ self.assertEqual(float(c5.mem_limit/1024/1024), float(E_MEM) / MAX_MU * 1024) # validate memory result
- res = rm.allocate("c5", "xlarge") # calculate allocation
- self.assertEqual(res[0], E_CPU / MAX_CU * 16) # validate compute result
- self.assertEqual(res[1], float(E_MEM) / MAX_MU * 1024) # validate memory result
- self.assertTrue(res[2] < 0) # validate disk result
def testAllocationCpuLimit(self):
"""
# test over provisioning exeption
exception = False
try:
- rm.allocate("c6", "xlarge") # calculate allocation
- rm.allocate("c7", "xlarge") # calculate allocation
- rm.allocate("c8", "xlarge") # calculate allocation
- rm.allocate("c9", "xlarge") # calculate allocation
+ c6 = createDummyContainerObject("c6", flavor="xlarge")
+ c7 = createDummyContainerObject("c7", flavor="xlarge")
+ c8 = createDummyContainerObject("c8", flavor="xlarge")
+ c9 = createDummyContainerObject("c9", flavor="xlarge")
+ rm.allocate(c6) # calculate allocation
+ rm.allocate(c7) # calculate allocation
+ rm.allocate(c8) # calculate allocation
+ rm.allocate(c9) # calculate allocation
except Exception as e:
self.assertIn("Not enough compute", e.message)
exception = True
# test over provisioning exeption
exception = False
try:
- rm.allocate("c6", "xlarge") # calculate allocation
- rm.allocate("c7", "xlarge") # calculate allocation
- rm.allocate("c8", "xlarge") # calculate allocation
+ c6 = createDummyContainerObject("c6", flavor="xlarge")
+ c7 = createDummyContainerObject("c7", flavor="xlarge")
+ c8 = createDummyContainerObject("c8", flavor="xlarge")
+ rm.allocate(c6) # calculate allocation
+ rm.allocate(c7) # calculate allocation
+ rm.allocate(c8) # calculate allocation
except Exception as e:
self.assertIn("Not enough memory", e.message)
exception = True
reg = ResourceModelRegistrar(dc_emulation_max_cpu=1.0, dc_emulation_max_mem=512)
rm = UpbSimpleCloudDcRM(max_cu=100, max_mu=100)
reg.register("test_dc", rm)
- rm.allocate("c1", "tiny") # calculate allocation
+ c1 = createDummyContainerObject("c6", flavor="tiny")
+ rm.allocate(c1) # calculate allocation
self.assertTrue(rm.dc_alloc_cu == 0.5)
- rm.free("c1")
+ rm.free(c1)
self.assertTrue(rm.dc_alloc_cu == 0)
def testInRealTopo(self):
self.assertTrue(len(self.net.rm_registrar.resource_models) == 1)
# check if alloc was called during startCompute
- self.assertTrue(len(r.allocated_compute_instances) == 0)
+ self.assertTrue(len(r._allocated_compute_instances) == 0)
tc1 = self.dc[0].startCompute("tc1", flavor_name="tiny")
time.sleep(1)
- self.assertTrue(len(r.allocated_compute_instances) == 1)
+ self.assertTrue(len(r._allocated_compute_instances) == 1)
# check if there is a real limitation set for containers cgroup
- self.assertEqual(tc1.cpu_period/tc1.cpu_quota, 100)
+ self.assertEqual(float(tc1.cpu_quota)/tc1.cpu_period, 0.005)
# check if free was called during stopCompute
self.dc[0].stopCompute("tc1")
- self.assertTrue(len(r.allocated_compute_instances) == 0)
+ self.assertTrue(len(r._allocated_compute_instances) == 0)
# check connectivity by using ping
self.assertTrue(self.net.ping([self.h[0], self.h[1]]) <= 0.0)
# stop Mininet network