From d18559d4cf08bc987272f3ac2c5ccd0b443398db Mon Sep 17 00:00:00 2001 From: peusterm Date: Sat, 16 Apr 2016 04:59:23 +0200 Subject: [PATCH] Re-wrote resource model API and UPB simple resource model. Due to an update of Dockernet, we can now change resource limits at runtime. --- src/emuvim/dcemulator/node.py | 30 ++- .../dcemulator/resourcemodel/__init__.py | 23 +- .../dcemulator/resourcemodel/upb/simple.py | 210 ++++++++++++------ .../examples/resource_model_demo_topology.py | 7 - src/emuvim/test/test_resourcemodel_api.py | 112 ++++++---- 5 files changed, 241 insertions(+), 141 deletions(-) diff --git a/src/emuvim/dcemulator/node.py b/src/emuvim/dcemulator/node.py index 2911445..1c85700 100755 --- a/src/emuvim/dcemulator/node.py +++ b/src/emuvim/dcemulator/node.py @@ -140,6 +140,7 @@ class Datacenter(object): if len(network) < 1: network.append({}) + """ # allocate in resource resource model and compute resource limits for new container cpu_limit = mem_limit = disk_limit = -1 cpu_period = cpu_quota = None @@ -169,18 +170,20 @@ class Datacenter(object): if mem_limit < 4: mem_limit = 4 LOG.warning("Increased MEM limit for %r because it was less than 4.0 MB." % name) + """ # create the container d = self.net.addDocker( "%s" % (name), dimage=image, dcmd=command, datacenter=self, - flavor_name=flavor_name, - cpu_period=int(cpu_period) if cpu_limit > 0 else None, # set cpu limits if needed - cpu_quota=int(cpu_quota) if cpu_limit > 0 else None, - #mem_limit="%dm" % int(mem_limit) if mem_limit > 0 else None, # set mem limits if needed - #memswap_limit="%dm" % int(mem_limit) if mem_limit > 0 else None # lets set swap to mem limit for now + flavor_name=flavor_name ) + + # apply resource limits to container if a resource model is defined + if self._resource_model is not None: + self._resource_model.allocate(d) + # connect all given networks # if no --net option is given, network = [{}], so 1 empty dict in the list # this results in 1 default interface with a default ip address @@ -190,6 +193,8 @@ class Datacenter(object): # do bookkeeping self.containers[name] = d + # TODO re-enable logging + """ # write resource log if a path is given if self.resource_log_path is not None: l = dict() @@ -205,6 +210,7 @@ class Datacenter(object): # append to logfile with open(self.resource_log_path, "a") as f: f.write("%s\n" % json.dumps(l)) + """ return d # we might use UUIDs for naming later on def stopCompute(self, name): @@ -215,14 +221,21 @@ class Datacenter(object): if name not in self.containers: raise Exception("Container with name %s not found." % name) LOG.debug("Stopping compute instance %r in data center %r" % (name, str(self))) + + # call resource model and free resources + if self._resource_model is not None: + self._resource_model.free(self.containers[name]) + + # remove links self.net.removeLink( link=None, node1=self.containers[name], node2=self.switch) + + # remove container self.net.removeDocker("%s" % (name)) del self.containers[name] - # call resource model and free resources - if self._resource_model is not None: - self._resource_model.free(name) + # TODO re-enable logging + """ # write resource log if a path is given if self.resource_log_path is not None: l = dict() @@ -237,6 +250,7 @@ class Datacenter(object): # append to logfile with open(self.resource_log_path, "a") as f: f.write("%s\n" % json.dumps(l)) + """ return True def listCompute(self): diff --git a/src/emuvim/dcemulator/resourcemodel/__init__.py b/src/emuvim/dcemulator/resourcemodel/__init__.py index c28e226..3df414f 100644 --- a/src/emuvim/dcemulator/resourcemodel/__init__.py +++ b/src/emuvim/dcemulator/resourcemodel/__init__.py @@ -76,7 +76,7 @@ class BaseResourceModel(object): self._initDefaultFlavors() self.registrar = None # pointer to registrar self.dcs = list() - self.allocated_compute_instances = dict() + self._allocated_compute_instances = dict() LOG.info("Resource model %r initialized" % self) def __repr__(self): @@ -107,26 +107,21 @@ class BaseResourceModel(object): raise Exception("Flavor with name %r already exists!" % fl.name) self._flavors[fl.name] = fl - def allocate(self, name, flavor_name): + def allocate(self, d): """ This method has to be overwritten by a real resource model. - :param name: Name of the started compute instance. - :param flavor_name: Name of the flavor to be allocated. - :return: 3-tuple: (CPU-fraction, Mem-limit, Disk-limit) + :param d: Container object """ - LOG.warning("Allocating in BaseResourceModel: %r with flavor: %r" % (name, flavor_name)) - self.allocated_compute_instances[name] = flavor_name - return -1.0, -1.0, -1.0 # return invalid values to indicate that this RM is a dummy + LOG.warning("Allocating in BaseResourceModel: %r with flavor: %r" % (d.name, d.flavor_name)) + self._allocated_compute_instances[d.name] = d.flavor_name - def free(self, name): + def free(self, d): """ This method has to be overwritten by a real resource model. - :param name: Name of the compute instance that is stopped. - :return: True/False + :param d: Container object """ - LOG.warning("Free in BaseResourceModel: %r" % name) - del self.allocated_compute_instances[name] - return True + LOG.warning("Free in BaseResourceModel: %r" % d.name) + del self._allocated_compute_instances[d.name] def get_state_dict(self): """ diff --git a/src/emuvim/dcemulator/resourcemodel/upb/simple.py b/src/emuvim/dcemulator/resourcemodel/upb/simple.py index 9417f60..ff0c852 100644 --- a/src/emuvim/dcemulator/resourcemodel/upb/simple.py +++ b/src/emuvim/dcemulator/resourcemodel/upb/simple.py @@ -16,7 +16,9 @@ class UpbSimpleCloudDcRM(BaseResourceModel): lifetime. """ - def __init__(self, max_cu=32, max_mu=1024): + def __init__(self, max_cu=32, max_mu=1024, + deactivate_cpu_limit=False, + deactivate_mem_limit=False): """ Initialize model. :param max_cu: Maximum number of compute units available in this DC. @@ -27,106 +29,168 @@ class UpbSimpleCloudDcRM(BaseResourceModel): self.dc_max_mu = max_mu self.dc_alloc_cu = 0 self.dc_alloc_mu = 0 - self.cu = 0 - self.mu = 0 + self.deactivate_cpu_limit = deactivate_cpu_limit + self.deactivate_mem_limit = deactivate_mem_limit super(self.__class__, self).__init__() - def allocate(self, name, flavor_name): + def allocate(self, d): """ - Calculate resources for container with given flavor. - :param name: Container name. - :param flavor_name: Flavor name. + Allocate resources for the given container. + Defined by d.flavor_name + :param d: container :return: """ - # bookkeeping and flavor handling - if flavor_name not in self._flavors: - raise Exception("Flavor %r does not exist" % flavor_name) - fl = self._flavors.get(flavor_name) - self.allocated_compute_instances[name] = flavor_name - # calc and return - return self._allocate_cpu(fl), self._allocate_mem(fl), -1.0 # return 3tuple (cpu, memory, disk) + self._allocated_compute_instances[d.name] = d + if not self.deactivate_cpu_limit: + self._allocate_cpu(d) + if not self.deactivate_mem_limit: + self._allocate_mem(d) + self._apply_limits() - def free(self, name): + def _allocate_cpu(self, d): """ - Free resources of given container. - :param name: Container name. + Actually allocate (bookkeeping) + :param d: container :return: """ - if name not in self.allocated_compute_instances: - return False - # bookkeeping - self._free_cpu(self._flavors.get(self.allocated_compute_instances[name])) - self._free_mem(self._flavors.get(self.allocated_compute_instances[name])) - del self.allocated_compute_instances[name] - # we don't have to calculate anything special here in this simple model - return True + fl_cu = self._get_flavor(d).get("compute") + # check for over provisioning + if self.dc_alloc_cu + fl_cu > self.dc_max_cu: + raise Exception("Not enough compute resources left.") + self.dc_alloc_cu += fl_cu - def get_state_dict(self): + def _allocate_mem(self, d): """ - Return the state of the resource model as simple dict. - Helper method for logging functionality. + Actually allocate (bookkeeping) + :param d: container :return: """ - r = dict() - r["e_cpu"] = self.registrar.e_cpu - r["e_mem"] = self.registrar.e_mem - r["dc_max_cu"] = self.dc_max_cu - r["dc_max_mu"] = self.dc_max_mu - r["dc_alloc_cu"] = self.dc_alloc_cu - r["dc_alloc_mu"] = self.dc_alloc_mu - r["cu_cpu_percentage"] = self.cu - r["mu_mem_percentage"] = self.mu - r["allocated_compute_instances"] = self.allocated_compute_instances - return r + fl_mu = self._get_flavor(d).get("memory") + # check for over provisioning + if self.dc_alloc_mu + fl_mu > self.dc_max_mu: + raise Exception("Not enough memory resources left.") + self.dc_alloc_mu += fl_mu - def _allocate_cpu(self, flavor): + def free(self, d): """ - Allocate CPU time. - :param flavor: flavor dict - :return: cpu time fraction + Free resources allocated to the given container. + :param d: container + :return: """ - fl_cu = flavor.get("compute") - # check for over provisioning - if self.dc_alloc_cu + fl_cu > self.dc_max_cu: - raise Exception("Not enough compute resources left.") - self.dc_alloc_cu += fl_cu + del self._allocated_compute_instances[d.name] + if not self.deactivate_cpu_limit: + self._free_cpu(d) + if not self.deactivate_mem_limit: + self._free_mem(d) + self._apply_limits() + + def _free_cpu(self, d): + """ + Free resources. + :param d: container + :return: + """ + self.dc_alloc_cu -= self._get_flavor(d).get("compute") + + def _free_mem(self, d): + """ + Free resources. + :param d: container + :return: + """ + self.dc_alloc_mu -= self._get_flavor(d).get("memory") + + def _apply_limits(self): + """ + Recalculate real resource limits for all allocated containers and apply them + to their cgroups. + We have to recalculate for all to allow e.g. overprovisioning models. + :return: + """ + for d in self._allocated_compute_instances.itervalues(): + if not self.deactivate_cpu_limit: + self._apply_cpu_limits(d) + if not self.deactivate_mem_limit: + self._apply_mem_limits(d) + + def _apply_cpu_limits(self, d): + """ + Calculate real CPU limit (CFS bandwidth) and apply. + :param d: container + :return: + """ + number_cu = self._get_flavor(d).get("compute") # get cpu time fraction for entire emulation e_cpu = self.registrar.e_cpu # calculate cpu time fraction of a single compute unit - self.cu = float(e_cpu) / sum([rm.dc_max_cu for rm in list(self.registrar.resource_models)]) + single_cu = float(e_cpu) / sum([rm.dc_max_cu for rm in list(self.registrar.resource_models)]) # calculate cpu time fraction for container with given flavor - return self.cu * fl_cu + cpu_time_percentage = single_cu * number_cu + # calculate cpu period and quota for CFS + # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt) + # Attention minimum cpu_quota is 1ms (micro) + cpu_period = 1000000 # lets consider a fixed period of 1000000 microseconds for now + cpu_quota = cpu_period * cpu_time_percentage # calculate the fraction of cpu time for this container + # ATTENTION >= 1000 to avoid a invalid argument system error ... no idea why + if cpu_quota < 1000: + cpu_quota = 1000 + LOG.warning("Increased CPU quota for %r to avoid system error." % d.name) + # apply to container if changed + if d.cpu_period != cpu_period or d.cpu_quota != cpu_quota: + LOG.debug("Setting CPU limit for %r: cpu_quota = cpu_period * limit = %f * %f = %f" % ( + d.name, cpu_period, cpu_time_percentage, cpu_quota)) + d.updateCpuLimit(cpu_period=int(cpu_period), cpu_quota=int(cpu_quota)) - def _free_cpu(self, flavor): + def _apply_mem_limits(self, d): """ - Free CPU allocation. - :param flavor: flavor dict + Calculate real mem limit and apply. + :param d: container :return: """ - self.dc_alloc_cu -= flavor.get("compute") + number_mu = self._get_flavor(d).get("memory") + # get memory amount for entire emulation + e_mem = self.registrar.e_mem + # calculate amount of memory for a single mu + single_mu = float(e_mem) / sum([rm.dc_max_mu for rm in list(self.registrar.resource_models)]) + # calculate mem for given flavor + mem_limit = single_mu * number_mu + # ATTENTION minimum mem_limit per container is 4MB + if mem_limit < 4: + mem_limit = 4 + LOG.warning("Increased MEM limit for %r because it was less than 4.0 MB." % name) + # to byte! + mem_limit = int(mem_limit*1024*1024) + # apply to container if changed + if d.mem_limit != mem_limit: + LOG.debug("Setting MEM limit for %r: mem_limit = %f MB" % (d.name, mem_limit/1024/1024)) + d.updateMemoryLimit(mem_limit=mem_limit) - def _allocate_mem(self, flavor): + def get_state_dict(self): """ - Allocate mem. - :param flavor: flavor dict - :return: mem limit in MB + Return the state of the resource model as simple dict. + Helper method for logging functionality. + :return: """ - fl_mu = flavor.get("memory") - # check for over provisioning - if self.dc_alloc_mu + fl_mu > self.dc_max_mu: - raise Exception("Not enough memory resources left.") - self.dc_alloc_mu += fl_mu - # get cpu time fraction for entire emulation - e_mem = self.registrar.e_mem - # calculate cpu time fraction of a single compute unit - self.mu = float(e_mem) / sum([rm.dc_max_mu for rm in list(self.registrar.resource_models)]) - # calculate cpu time fraction for container with given flavor - return self.mu * fl_mu + # TODO update + r = dict() + r["e_cpu"] = self.registrar.e_cpu + r["e_mem"] = self.registrar.e_mem + r["dc_max_cu"] = self.dc_max_cu + r["dc_max_mu"] = self.dc_max_mu + r["dc_alloc_cu"] = self.dc_alloc_cu + r["dc_alloc_mu"] = self.dc_alloc_mu + r["cu_cpu_percentage"] = -1 + r["mu_mem_percentage"] = -1 + r["allocated_compute_instances"] = None #self._allocated_compute_instances + return r - def _free_mem(self, flavor): + def _get_flavor(self, d): """ - Free memory allocation - :param flavor: flavor dict + Get flavor assigned to given container. + Identified by d.flavor_name. + :param d: container :return: """ - self.dc_alloc_mu -= flavor.get("memory") + if d.flavor_name not in self._flavors: + raise Exception("Flavor %r does not exist" % d.flavor_name) + return self._flavors.get(d.flavor_name) diff --git a/src/emuvim/examples/resource_model_demo_topology.py b/src/emuvim/examples/resource_model_demo_topology.py index 7a39b00..fb3d1f7 100644 --- a/src/emuvim/examples/resource_model_demo_topology.py +++ b/src/emuvim/examples/resource_model_demo_topology.py @@ -43,13 +43,6 @@ def create_topology1(): # run API endpoint server (in another thread, don't block) zapi1.start() - # add the SONATA dummy gatekeeper to each DC - sdkg1 = SonataDummyGatekeeperEndpoint("0.0.0.0", 8000) - sdkg1.connectDatacenter(dc1) - sdkg1.connectDatacenter(dc2) - # run the dummy gatekeeper (in another thread, don't block) - sdkg1.start() - # start the emulation platform net.start() print "Wait a moment and allocate some compute start some compute resources..." diff --git a/src/emuvim/test/test_resourcemodel_api.py b/src/emuvim/test/test_resourcemodel_api.py index 5266330..b8e66f6 100644 --- a/src/emuvim/test/test_resourcemodel_api.py +++ b/src/emuvim/test/test_resourcemodel_api.py @@ -23,9 +23,6 @@ class testResourceModel(SimpleTestTopology): r.addFlavour(f) self.assertTrue("test" in r._flavors) self.assertTrue(r._flavors.get("test").get("testmetric") == 42) - # test if allocate and free runs through - self.assertTrue(len(r.allocate("testc", "tiny")) == 3) # expected: 3tuple - self.assertTrue(r.free("testc")) def testAddRmToDc(self): """ @@ -51,19 +48,44 @@ class testResourceModel(SimpleTestTopology): self.assertTrue(len(self.net.rm_registrar.resource_models) == 1) # check if alloc was called during startCompute - self.assertTrue(len(r.allocated_compute_instances) == 0) + self.assertTrue(len(r._allocated_compute_instances) == 0) self.dc[0].startCompute("tc1") time.sleep(1) - self.assertTrue(len(r.allocated_compute_instances) == 1) + self.assertTrue(len(r._allocated_compute_instances) == 1) # check if free was called during stopCompute self.dc[0].stopCompute("tc1") - self.assertTrue(len(r.allocated_compute_instances) == 0) + self.assertTrue(len(r._allocated_compute_instances) == 0) # check connectivity by using ping self.assertTrue(self.net.ping([self.h[0], self.h[1]]) <= 0.0) # stop Mininet network self.stopNet() +def createDummyContainerObject(name, flavor): + + class DummyContainer(object): + + def __init__(self): + self.cpu_period = -1 + self.cpu_quota = -1 + self.mem_limit = -1 + self.memswap_limit = -1 + + def updateCpuLimit(self, cpu_period, cpu_quota): + self.cpu_period = cpu_period + self.cpu_quota = cpu_quota + + def updateMemoryLimit(self, mem_limit): + self.mem_limit = mem_limit + + d = DummyContainer() + d.name = name + d.flavor_name = flavor + return d + + + + class testUpbSimpleCloudDcRM(SimpleTestTopology): """ Test the UpbSimpleCloudDc resource model. @@ -84,30 +106,34 @@ class testUpbSimpleCloudDcRM(SimpleTestTopology): rm = UpbSimpleCloudDcRM(max_cu=MAX_CU, max_mu=MAX_MU) reg.register("test_dc", rm) - res = rm.allocate("c1", "tiny") # calculate allocation - self.assertEqual(res[0], E_CPU / MAX_CU * 0.5) # validate compute result - self.assertEqual(res[1], float(E_MEM) / MAX_MU * 32) # validate memory result - self.assertTrue(res[2] < 0) # validate disk result + c1 = createDummyContainerObject("c1", flavor="tiny") + rm.allocate(c1) # calculate allocation + self.assertEqual(float(c1.cpu_quota) / c1.cpu_period, E_CPU / MAX_CU * 0.5) # validate compute result + self.assertEqual(float(c1.mem_limit/1024/1024), float(E_MEM) / MAX_MU * 32) # validate memory result + + c2 = createDummyContainerObject("c2", flavor="small") + rm.allocate(c2) # calculate allocation + self.assertEqual(float(c2.cpu_quota) / c2.cpu_period, E_CPU / MAX_CU * 1) # validate compute result + self.assertEqual(float(c2.mem_limit/1024/1024), float(E_MEM) / MAX_MU * 128) # validate memory result + + + c3 = createDummyContainerObject("c3", flavor="medium") + res = rm.allocate(c3) # calculate allocation + self.assertEqual(float(c3.cpu_quota) / c3.cpu_period, E_CPU / MAX_CU * 4) # validate compute result + self.assertEqual(float(c3.mem_limit/1024/1024), float(E_MEM) / MAX_MU * 256) # validate memory result + - res = rm.allocate("c2", "small") # calculate allocation - self.assertEqual(res[0], E_CPU / MAX_CU * 1) # validate compute result - self.assertEqual(res[1], float(E_MEM) / MAX_MU * 128) # validate memory result - self.assertTrue(res[2] < 0) # validate disk result + c4 = createDummyContainerObject("c4", flavor="large") + rm.allocate(c4) # calculate allocation + self.assertEqual(float(c4.cpu_quota) / c4.cpu_period, E_CPU / MAX_CU * 8) # validate compute result + self.assertEqual(float(c4.mem_limit/1024/1024), float(E_MEM) / MAX_MU * 512) # validate memory result - res = rm.allocate("c3", "medium") # calculate allocation - self.assertEqual(res[0], E_CPU / MAX_CU * 4) # validate compute result - self.assertEqual(res[1], float(E_MEM) / MAX_MU * 256) # validate memory result - self.assertTrue(res[2] < 0) # validate disk result - res = rm.allocate("c4", "large") # calculate allocation - self.assertEqual(res[0], E_CPU / MAX_CU * 8) # validate compute result - self.assertEqual(res[1], float(E_MEM) / MAX_MU * 512) # validate memory result - self.assertTrue(res[2] < 0) # validate disk result + c5 = createDummyContainerObject("c5", flavor="xlarge") + rm.allocate(c5) # calculate allocation + self.assertEqual(float(c5.cpu_quota) / c5.cpu_period, E_CPU / MAX_CU * 16) # validate compute result + self.assertEqual(float(c5.mem_limit/1024/1024), float(E_MEM) / MAX_MU * 1024) # validate memory result - res = rm.allocate("c5", "xlarge") # calculate allocation - self.assertEqual(res[0], E_CPU / MAX_CU * 16) # validate compute result - self.assertEqual(res[1], float(E_MEM) / MAX_MU * 1024) # validate memory result - self.assertTrue(res[2] < 0) # validate disk result def testAllocationCpuLimit(self): """ @@ -127,10 +153,14 @@ class testUpbSimpleCloudDcRM(SimpleTestTopology): # test over provisioning exeption exception = False try: - rm.allocate("c6", "xlarge") # calculate allocation - rm.allocate("c7", "xlarge") # calculate allocation - rm.allocate("c8", "xlarge") # calculate allocation - rm.allocate("c9", "xlarge") # calculate allocation + c6 = createDummyContainerObject("c6", flavor="xlarge") + c7 = createDummyContainerObject("c7", flavor="xlarge") + c8 = createDummyContainerObject("c8", flavor="xlarge") + c9 = createDummyContainerObject("c9", flavor="xlarge") + rm.allocate(c6) # calculate allocation + rm.allocate(c7) # calculate allocation + rm.allocate(c8) # calculate allocation + rm.allocate(c9) # calculate allocation except Exception as e: self.assertIn("Not enough compute", e.message) exception = True @@ -154,9 +184,12 @@ class testUpbSimpleCloudDcRM(SimpleTestTopology): # test over provisioning exeption exception = False try: - rm.allocate("c6", "xlarge") # calculate allocation - rm.allocate("c7", "xlarge") # calculate allocation - rm.allocate("c8", "xlarge") # calculate allocation + c6 = createDummyContainerObject("c6", flavor="xlarge") + c7 = createDummyContainerObject("c7", flavor="xlarge") + c8 = createDummyContainerObject("c8", flavor="xlarge") + rm.allocate(c6) # calculate allocation + rm.allocate(c7) # calculate allocation + rm.allocate(c8) # calculate allocation except Exception as e: self.assertIn("Not enough memory", e.message) exception = True @@ -174,9 +207,10 @@ class testUpbSimpleCloudDcRM(SimpleTestTopology): reg = ResourceModelRegistrar(dc_emulation_max_cpu=1.0, dc_emulation_max_mem=512) rm = UpbSimpleCloudDcRM(max_cu=100, max_mu=100) reg.register("test_dc", rm) - rm.allocate("c1", "tiny") # calculate allocation + c1 = createDummyContainerObject("c6", flavor="tiny") + rm.allocate(c1) # calculate allocation self.assertTrue(rm.dc_alloc_cu == 0.5) - rm.free("c1") + rm.free(c1) self.assertTrue(rm.dc_alloc_cu == 0) def testInRealTopo(self): @@ -203,17 +237,17 @@ class testUpbSimpleCloudDcRM(SimpleTestTopology): self.assertTrue(len(self.net.rm_registrar.resource_models) == 1) # check if alloc was called during startCompute - self.assertTrue(len(r.allocated_compute_instances) == 0) + self.assertTrue(len(r._allocated_compute_instances) == 0) tc1 = self.dc[0].startCompute("tc1", flavor_name="tiny") time.sleep(1) - self.assertTrue(len(r.allocated_compute_instances) == 1) + self.assertTrue(len(r._allocated_compute_instances) == 1) # check if there is a real limitation set for containers cgroup - self.assertEqual(tc1.cpu_period/tc1.cpu_quota, 100) + self.assertEqual(float(tc1.cpu_quota)/tc1.cpu_period, 0.005) # check if free was called during stopCompute self.dc[0].stopCompute("tc1") - self.assertTrue(len(r.allocated_compute_instances) == 0) + self.assertTrue(len(r._allocated_compute_instances) == 0) # check connectivity by using ping self.assertTrue(self.net.ping([self.h[0], self.h[1]]) <= 0.0) # stop Mininet network -- 2.25.1