--- /dev/null
+"""
+Copyright (c) 2015 SONATA-NFV and Paderborn University
+ALL RIGHTS RESERVED.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
+nor the names of its contributors may be used to endorse or promote
+products derived from this software without specific prior written
+permission.
+
+This work has been performed in the framework of the SONATA project,
+funded by the European Commission under Grant number 671517 through
+the Horizon 2020 and 5G-PPP programmes. The authors would like to
+acknowledge the contributions of their colleagues of the SONATA
+partner consortium (www.sonata-nfv.eu).
+"""
+
+"""
+Monitor the skewness of the resource usage probability distribution
+and export to a Prometheus Push Gateway
+(c) 2017 by Steven Van Rossem <steven.vanrossem@intec.ugent.be>
+"""
+
+#!/usr/bin/python3
+
+from time import sleep, time, perf_counter
+import math
+from prometheus_client import start_http_server, Summary, Histogram, Gauge, Counter, REGISTRY, CollectorRegistry, \
+ pushadd_to_gateway, push_to_gateway, delete_from_gateway
+import threading
+import os
+import json
+
+import logging
+LOG = logging.getLogger('skewmon')
+LOG.setLevel(level=logging.DEBUG)
+LOG.addHandler(logging.StreamHandler())
+
+
+# put env vars in Dockerfile
+PUSHGATEWAY_IP = '172.17.0.1'
+PUSHGATEWAY_PORT = 9091
+PUSHGATEWAY_ADDR = ':'.join([PUSHGATEWAY_IP, str(PUSHGATEWAY_PORT)])
+
+
+#general settings (ms)
+SAMPLE_PERIOD = int(os.environ['SAMPLE_PERIOD'])
+TOTAL_PERIOD = int(os.environ['TOTAL_PERIOD'])
+
+# define global variables
+registry = CollectorRegistry()
+exported_metric = Gauge('skewness', 'Skewness of docker vnf resource usage',
+ ['vnf_id', 'vnf_name', 'vnf_metric'], registry=registry)
+
+# find the VNFs to monitor
+# {metric_shortId: {VNF_NAME:<>,VNF_ID:<>,VNF_METRIC:<>}}
+
+def get_vnfs_to_monitor(config):
+ for key in config:
+ vnf_name = config[key].get('VNF_NAME')
+ vnf_id = config[key].get('VNF_ID')
+ vnf_metric = config[key].get('VNF_METRIC')
+ yield (vnf_id, vnf_name, vnf_metric)
+
+# export metric to the Prometheus PushGateway
+def export_metrics(key=None):
+ try:
+ pushadd_to_gateway(PUSHGATEWAY_ADDR, job='sonemu-skewmon', registry=registry, grouping_key=key)
+ except Exception as e:
+ LOG.warning("Pushgateway not reachable: {0}".format(str(e)))
+
+class skewness_monitor():
+ def __init__(self, docker_id, docker_name, metric):
+ # Prometheus metric to export
+ self.prom_skewness = exported_metric
+ self.docker_id = docker_id
+ self.docker_name = docker_name
+ self.vnf_metric = metric
+
+ # https://www.datadoghq.com/blog/how-to-collect-docker-metrics/
+ self.cpu_proc_file = '/sys/fs/cgroup/cpuacct/docker/{0}/cpuacct.usage'.format(self.docker_id)
+ self.mem_proc_file = '/sys/fs/cgroup/memory/docker/{0}/memory.usage_in_bytes'.format(self.docker_id)
+ metric_dict = {'cpu': self.cpu_proc_file,
+ 'mem': self.mem_proc_file}
+
+ self.proc_file = metric_dict[metric]
+
+ self.fp = open(self.proc_file)
+
+ #monitoring thread
+ self.export_thread = None
+ self.monitor_stop = threading.Event()
+
+ # get statistics with certain frequency and export skewness for further analysis
+ def _calc_skewness(self):
+
+ cpu_count0 = 0
+ time0 = 0
+
+ #milliseconds
+ stat_delta = SAMPLE_PERIOD
+ sample_T = TOTAL_PERIOD
+
+ data = []
+ n = 0
+
+ moment1 = 0
+ moment2 = 0
+ moment3 = 0
+
+ fp = self.fp
+
+ #collect samples
+ for n in range(0,round(sample_T/stat_delta)):
+ # first measurement
+ if cpu_count0 <= 0 or time0 <= 0:
+ time0 = perf_counter()
+ cpu_count0 = int(fp.read().strip())
+ fp.seek(0)
+ sleep(stat_delta/1000)
+ continue
+
+
+ #perf_counter in seconds
+ time1 = perf_counter()
+
+ # cpu count in nanoseconds
+ cpu_count1 = int(fp.read().strip())
+ fp.seek(0)
+
+ cpu_delta = cpu_count1 - cpu_count0
+ cpu_count0 = cpu_count1
+
+ time_delta = time1 - time0
+ time0 = time1
+
+ #work in nanoseconds
+ metric = (cpu_delta / (time_delta * 1e9))
+
+ data.append(metric)
+
+ #running calculation of sample moments
+ moment1 += metric
+ temp = metric * metric
+ moment2 += temp
+ moment3 += temp * metric
+
+
+ sleep(stat_delta/1000)
+
+ # calc skewness
+ M1 = (1 / n) * moment1
+ M2 = ((1 / n) * moment2) - M1**2
+ M3 = ((1 / n) * moment3) - (3 * M1 * ((1 / n) * moment2)) + (2 * M1**3)
+
+ s2 = (math.sqrt(n*(n - 1))/(n - 2)) * (M3 / (M2)**1.5)
+
+ LOG.info("docker_name: {0} metric: {1}".format(self.docker_name, self.vnf_metric))
+ LOG.info("Nsamples: {0}".format(n))
+ LOG.info("skewness: {0:.2f}".format(s2))
+ LOG.info("\n")
+
+ return s2
+
+ def _export_skewness_loop(self, stop_event):
+ #loop until flag is set
+ while(not stop_event.is_set()):
+ try:
+ skewness = self._calc_skewness()
+ self.prom_skewness.labels(vnf_id=self.docker_id, vnf_name=self.docker_name, vnf_metric=self.vnf_metric)\
+ .set(skewness)
+ except ZeroDivisionError as e:
+ self.prom_skewness.labels(vnf_id=self.docker_id, vnf_name=self.docker_name, vnf_metric=self.vnf_metric) \
+ .set(float('nan'))
+ LOG.warning("{1}: Skewness cannot be calculated: {0}".format(str(e), self.docker_name))
+ except Exception as e:
+ LOG.warning("Skewness cannot be calculated, stop thread: {0}".format(str(e)))
+ self.monitor_stop.set()
+
+ # if while has ended, monitoring thread will stop
+ self.prom_skewness.labels(vnf_id=self.docker_id, vnf_name=self.docker_name, vnf_metric=self.vnf_metric) \
+ .set(float('nan'))
+
+ #start the monitoring thread
+ def start(self):
+ if self.export_thread is not None:
+ LOG.warning('monitor thread is already running for: {0}'.format(self.docker_name))
+ return
+
+ self.export_thread = threading.Thread(target=self._export_skewness_loop, args=(self.monitor_stop,))
+ self.export_thread.start()
+ LOG.info('started thread: {0}'.format(self.docker_name))
+
+ #stop the monitoring thread
+ def stop(self):
+ self.monitor_stop.set()
+
+
+if __name__ == '__main__':
+
+ #started_vnfs {vnf_id:object}
+ vnfs_monitored = {}
+
+ # endless loop
+ while True:
+ #check config.txt for docker ids/names
+ configfile = open('/config.txt', 'r')
+ config = json.load(configfile)
+ vnfs_to_monitor = list(get_vnfs_to_monitor(config))
+
+ #for each new docker id in ENV start thread to monitor skewness
+ for vnf_id, vnf_name, vnf_metric in vnfs_to_monitor:
+ key = '_'.join([vnf_metric, vnf_id])
+ if key not in vnfs_monitored:
+ try:
+ vnfs_monitored[key] = skewness_monitor(vnf_id, vnf_name, vnf_metric)
+ vnfs_monitored[key].start()
+ except Exception as e:
+ LOG.warning("Monitor cannot be started: {0}".format(str(e)))
+
+ #for each removed docker id ENV, stop export
+ for vnf_key in list(vnfs_monitored):
+ vnf_keys_to_monitor = ['_'.join([vnf_metric, vnf_id]) for vnf_id, vnf_name, vnf_metric in vnfs_to_monitor]
+ if vnf_key not in vnf_keys_to_monitor:
+ vnfs_monitored[vnf_key].stop()
+
+ vnf_name = vnfs_monitored[vnf_key].docker_name
+ vnf_metric, vnf_id = vnf_key.split('_')
+ LOG.info('stop monitored VNFs: {0}'.format(vnfs_monitored[vnf_key].docker_name))
+ del vnfs_monitored[vnf_key]
+
+ # remove metric with labels from registry
+ # (Push Gateway remembers last pushed value, so this is not so useful)
+ # collector = registry._names_to_collectors['skewness']
+ # if (vnf_id, vnf_name, vnf_metric) in collector._metrics:
+ # collector.remove(vnf_id, vnf_name, vnf_metric)
+ delete_from_gateway(PUSHGATEWAY_ADDR, job='sonemu-skewmon')
+
+
+ #push to Prometheus gateway
+ export_metrics()
+ LOG.info('monitored VNFs: {0}'.format([monitor.docker_name for key, monitor in vnfs_monitored.items()]))
+ # wait before checking again
+ sleep(TOTAL_PERIOD/1000)
\ No newline at end of file