Modified CI integration.
[osm/vim-emu.git] / misc / son-monitor / skewmon / skewmon.py
1 """
2 Copyright (c) 2015 SONATA-NFV and Paderborn University
3 ALL RIGHTS RESERVED.
4
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
20 permission.
21
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
27 """
28
29 """
30 Monitor the skewness of the resource usage probability distribution
31 and export to a Prometheus Push Gateway
32 (c) 2017 by Steven Van Rossem <steven.vanrossem@intec.ugent.be>
33 """
34
35 #!/usr/bin/python3
36
37 from time import sleep, time, perf_counter
38 import math
39 from prometheus_client import start_http_server, Summary, Histogram, Gauge, Counter, REGISTRY, CollectorRegistry, \
40 pushadd_to_gateway, push_to_gateway, delete_from_gateway
41 import threading
42 import os
43 import json
44
45 import logging
46 LOG = logging.getLogger('skewmon')
47 LOG.setLevel(level=logging.DEBUG)
48 LOG.addHandler(logging.StreamHandler())
49
50
51 # put env vars in Dockerfile
52 PUSHGATEWAY_IP = '172.17.0.1'
53 PUSHGATEWAY_PORT = 9091
54 PUSHGATEWAY_ADDR = ':'.join([PUSHGATEWAY_IP, str(PUSHGATEWAY_PORT)])
55
56
57 #general settings (ms)
58 SAMPLE_PERIOD = int(os.environ['SAMPLE_PERIOD'])
59 TOTAL_PERIOD = int(os.environ['TOTAL_PERIOD'])
60
61 # define global variables
62 registry = CollectorRegistry()
63 exported_metric = Gauge('skewness', 'Skewness of docker vnf resource usage',
64 ['vnf_id', 'vnf_name', 'vnf_metric'], registry=registry)
65
66 # find the VNFs to monitor
67 # {metric_shortId: {VNF_NAME:<>,VNF_ID:<>,VNF_METRIC:<>}}
68
69 def get_vnfs_to_monitor(config):
70 for key in config:
71 vnf_name = config[key].get('VNF_NAME')
72 vnf_id = config[key].get('VNF_ID')
73 vnf_metric = config[key].get('VNF_METRIC')
74 yield (vnf_id, vnf_name, vnf_metric)
75
76 # export metric to the Prometheus PushGateway
77 def export_metrics(key=None):
78 try:
79 pushadd_to_gateway(PUSHGATEWAY_ADDR, job='sonemu-skewmon', registry=registry, grouping_key=key)
80 except Exception as e:
81 LOG.warning("Pushgateway not reachable: {0}".format(str(e)))
82
83 class skewness_monitor():
84 def __init__(self, docker_id, docker_name, metric):
85 # Prometheus metric to export
86 self.prom_skewness = exported_metric
87 self.docker_id = docker_id
88 self.docker_name = docker_name
89 self.vnf_metric = metric
90
91 # https://www.datadoghq.com/blog/how-to-collect-docker-metrics/
92 self.cpu_proc_file = '/sys/fs/cgroup/cpuacct/docker/{0}/cpuacct.usage'.format(self.docker_id)
93 self.mem_proc_file = '/sys/fs/cgroup/memory/docker/{0}/memory.usage_in_bytes'.format(self.docker_id)
94 metric_dict = {'cpu': self.cpu_proc_file,
95 'mem': self.mem_proc_file}
96
97 self.proc_file = metric_dict[metric]
98
99 self.fp = open(self.proc_file)
100
101 #monitoring thread
102 self.export_thread = None
103 self.monitor_stop = threading.Event()
104
105 # get statistics with certain frequency and export skewness for further analysis
106 def _calc_skewness(self):
107
108 cpu_count0 = 0
109 time0 = 0
110
111 #milliseconds
112 stat_delta = SAMPLE_PERIOD
113 sample_T = TOTAL_PERIOD
114
115 data = []
116 n = 0
117
118 moment1 = 0
119 moment2 = 0
120 moment3 = 0
121
122 fp = self.fp
123
124 #collect samples
125 for n in range(0,round(sample_T/stat_delta)):
126 # first measurement
127 if cpu_count0 <= 0 or time0 <= 0:
128 time0 = perf_counter()
129 cpu_count0 = int(fp.read().strip())
130 fp.seek(0)
131 sleep(stat_delta/1000)
132 continue
133
134
135 #perf_counter in seconds
136 time1 = perf_counter()
137
138 # cpu count in nanoseconds
139 cpu_count1 = int(fp.read().strip())
140 fp.seek(0)
141
142 cpu_delta = cpu_count1 - cpu_count0
143 cpu_count0 = cpu_count1
144
145 time_delta = time1 - time0
146 time0 = time1
147
148 #work in nanoseconds
149 metric = (cpu_delta / (time_delta * 1e9))
150
151 data.append(metric)
152
153 #running calculation of sample moments
154 moment1 += metric
155 temp = metric * metric
156 moment2 += temp
157 moment3 += temp * metric
158
159
160 sleep(stat_delta/1000)
161
162 # calc skewness
163 M1 = (1 / n) * moment1
164 M2 = ((1 / n) * moment2) - M1**2
165 M3 = ((1 / n) * moment3) - (3 * M1 * ((1 / n) * moment2)) + (2 * M1**3)
166
167 s2 = (math.sqrt(n*(n - 1))/(n - 2)) * (M3 / (M2)**1.5)
168
169 LOG.info("docker_name: {0} metric: {1}".format(self.docker_name, self.vnf_metric))
170 LOG.info("Nsamples: {0}".format(n))
171 LOG.info("skewness: {0:.2f}".format(s2))
172 LOG.info("\n")
173
174 return s2
175
176 def _export_skewness_loop(self, stop_event):
177 #loop until flag is set
178 while(not stop_event.is_set()):
179 try:
180 skewness = self._calc_skewness()
181 self.prom_skewness.labels(vnf_id=self.docker_id, vnf_name=self.docker_name, vnf_metric=self.vnf_metric)\
182 .set(skewness)
183 except ZeroDivisionError as e:
184 self.prom_skewness.labels(vnf_id=self.docker_id, vnf_name=self.docker_name, vnf_metric=self.vnf_metric) \
185 .set(float('nan'))
186 LOG.warning("{1}: Skewness cannot be calculated: {0}".format(str(e), self.docker_name))
187 except Exception as e:
188 LOG.warning("Skewness cannot be calculated, stop thread: {0}".format(str(e)))
189 self.monitor_stop.set()
190
191 # if while has ended, monitoring thread will stop
192 self.prom_skewness.labels(vnf_id=self.docker_id, vnf_name=self.docker_name, vnf_metric=self.vnf_metric) \
193 .set(float('nan'))
194
195 #start the monitoring thread
196 def start(self):
197 if self.export_thread is not None:
198 LOG.warning('monitor thread is already running for: {0}'.format(self.docker_name))
199 return
200
201 self.export_thread = threading.Thread(target=self._export_skewness_loop, args=(self.monitor_stop,))
202 self.export_thread.start()
203 LOG.info('started thread: {0}'.format(self.docker_name))
204
205 #stop the monitoring thread
206 def stop(self):
207 self.monitor_stop.set()
208
209
210 if __name__ == '__main__':
211
212 #started_vnfs {vnf_id:object}
213 vnfs_monitored = {}
214
215 # endless loop
216 while True:
217 #check config.txt for docker ids/names
218 configfile = open('/config.txt', 'r')
219 config = json.load(configfile)
220 vnfs_to_monitor = list(get_vnfs_to_monitor(config))
221
222 #for each new docker id in ENV start thread to monitor skewness
223 for vnf_id, vnf_name, vnf_metric in vnfs_to_monitor:
224 key = '_'.join([vnf_metric, vnf_id])
225 if key not in vnfs_monitored:
226 try:
227 vnfs_monitored[key] = skewness_monitor(vnf_id, vnf_name, vnf_metric)
228 vnfs_monitored[key].start()
229 except Exception as e:
230 LOG.warning("Monitor cannot be started: {0}".format(str(e)))
231
232 #for each removed docker id ENV, stop export
233 for vnf_key in list(vnfs_monitored):
234 vnf_keys_to_monitor = ['_'.join([vnf_metric, vnf_id]) for vnf_id, vnf_name, vnf_metric in vnfs_to_monitor]
235 if vnf_key not in vnf_keys_to_monitor:
236 vnfs_monitored[vnf_key].stop()
237
238 vnf_name = vnfs_monitored[vnf_key].docker_name
239 vnf_metric, vnf_id = vnf_key.split('_')
240 LOG.info('stop monitored VNFs: {0}'.format(vnfs_monitored[vnf_key].docker_name))
241 del vnfs_monitored[vnf_key]
242
243 # remove metric with labels from registry
244 # (Push Gateway remembers last pushed value, so this is not so useful)
245 # collector = registry._names_to_collectors['skewness']
246 # if (vnf_id, vnf_name, vnf_metric) in collector._metrics:
247 # collector.remove(vnf_id, vnf_name, vnf_metric)
248 delete_from_gateway(PUSHGATEWAY_ADDR, job='sonemu-skewmon')
249
250
251 #push to Prometheus gateway
252 export_metrics()
253 LOG.info('monitored VNFs: {0}'.format([monitor.docker_name for key, monitor in vnfs_monitored.items()]))
254 # wait before checking again
255 sleep(TOTAL_PERIOD/1000)