Unified command execution in Docker containers.
[osm/vim-emu.git] / src / emuvim / api / rest / compute.py
1 # Copyright (c) 2015 SONATA-NFV and Paderborn University
2 # ALL RIGHTS RESERVED.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Neither the name of the SONATA-NFV, Paderborn University
17 # nor the names of its contributors may be used to endorse or promote
18 # products derived from this software without specific prior written
19 # permission.
20 #
21 # This work has been performed in the framework of the SONATA project,
22 # funded by the European Commission under Grant number 671517 through
23 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
24 # acknowledge the contributions of their colleagues of the SONATA
25 # partner consortium (www.sonata-nfv.eu).
26 import logging
27 from flask_restful import Resource
28 from flask import request
29 import json
30 import threading
31
32 logging.basicConfig()
33
34 CORS_HEADER = {'Access-Control-Allow-Origin': '*'}
35
36 # the dcs dict is set in the rest_api_endpoint.py upon datacenter init
37 dcs = {}
38
39
40 class Compute(Resource):
41 """
42 Start a new compute instance: A docker container (note: zerorpc does not support keyword arguments)
43 :param dc_label: name of the DC
44 :param compute_name: compute container name
45 :param image: image name
46 :param command: command to execute
47 :param network: list of all interface of the vnf, with their parameters (id=id1,ip=x.x.x.x/x),...
48 example networks list({"id":"input","ip": "10.0.0.254/8"}, {"id":"output","ip": "11.0.0.254/24"})
49 :return: docker inspect dict of deployed docker
50 """
51
52 global dcs
53
54 def put(self, dc_label, compute_name, resource=None, value=None):
55
56 # deploy new container
57 # check if json data is a dict
58 data = request.json
59 if data is None:
60 data = {}
61 elif not isinstance(data, dict):
62 data = json.loads(request.json)
63
64 network = data.get("network")
65 nw_list = self._parse_network(network)
66 image = data.get("image")
67 command = data.get("docker_command")
68
69 try:
70 if compute_name is None or compute_name == "None":
71 logging.error("No compute name defined in request.")
72 return "No compute name defined in request.", 500, CORS_HEADER
73 if dc_label is None or dcs.get(dc_label) is None:
74 logging.error("No datacenter defined in request.")
75 return "No datacenter defined in request.", 500, CORS_HEADER
76 c = dcs.get(dc_label).startCompute(
77 compute_name, image=image, command=command, network=nw_list)
78 # (if available) trigger emu. entry point given in Dockerfile
79 try:
80 config = c.dcinfo.get("Config", dict())
81 env = config.get("Env", list())
82 for env_var in env:
83 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
84 logging.debug("%r = %r" % (var, cmd))
85 if var == "SON_EMU_CMD" or var == "VIM_EMU_CMD":
86 logging.info("Executing script in '{}': {}={}"
87 .format(compute_name, var, cmd))
88 # execute command in new thread to ensure that API is
89 # not blocked by VNF
90 t = threading.Thread(target=c.cmdPrint, args=(cmd,))
91 t.daemon = True
92 t.start()
93 except Exception as ex:
94 logging.warning("Couldn't run Docker entry point VIM_EMU_CMD")
95 logging.exception("Exception:")
96 # return docker inspect dict
97 return c.getStatus(), 200, CORS_HEADER
98 except Exception as ex:
99 logging.exception("API error.")
100 return ex.message, 500, CORS_HEADER
101
102 def get(self, dc_label, compute_name):
103
104 logging.debug("API CALL: compute status")
105
106 try:
107 return dcs.get(dc_label).containers.get(
108 compute_name).getStatus(), 200, CORS_HEADER
109 except Exception as ex:
110 logging.exception("API error.")
111 return ex.message, 500, CORS_HEADER
112
113 def delete(self, dc_label, compute_name):
114 logging.debug("API CALL: compute stop")
115 try:
116 return dcs.get(dc_label).stopCompute(
117 compute_name), 200, CORS_HEADER
118 except Exception as ex:
119 logging.exception("API error.")
120 return ex.message, 500, CORS_HEADER
121
122 def _parse_network(self, network_str):
123 '''
124 parse the options for all network interfaces of the vnf
125 :param network_str: (id=x,ip=x.x.x.x/x), ...
126 :return: list of dicts [{"id":x,"ip":"x.x.x.x/x"}, ...]
127 '''
128 nw_list = list()
129
130 # TODO make this more robust with regex check
131 if network_str is None:
132 return nw_list
133
134 networks = network_str[1:-1].split('),(')
135 for nw in networks:
136 nw_dict = dict(tuple(e.split('=')) for e in nw.split(','))
137 nw_list.append(nw_dict)
138
139 return nw_list
140
141
142 class ComputeList(Resource):
143 global dcs
144
145 def get(self, dc_label=None):
146 logging.debug("API CALL: compute list")
147 try:
148 if dc_label is None or dc_label == 'None':
149 # return list with all compute nodes in all DCs
150 all_containers = []
151 all_extSAPs = []
152 for dc in dcs.itervalues():
153 all_containers += dc.listCompute()
154 all_extSAPs += dc.listExtSAPs()
155
156 extSAP_list = [(sap.name, sap.getStatus())
157 for sap in all_extSAPs]
158 container_list = [(c.name, c.getStatus())
159 for c in all_containers]
160 total_list = container_list + extSAP_list
161 return total_list, 200, CORS_HEADER
162 else:
163 # return list of compute nodes for specified DC
164 container_list = [(c.name, c.getStatus())
165 for c in dcs.get(dc_label).listCompute()]
166 extSAP_list = [(sap.name, sap.getStatus())
167 for sap in dcs.get(dc_label).listExtSAPs()]
168 total_list = container_list + extSAP_list
169 return total_list, 200, CORS_HEADER
170 except Exception as ex:
171 logging.exception("API error.")
172 return ex.message, 500, CORS_HEADER
173
174
175 class ComputeResources(Resource):
176 """
177 Update the container's resources using the docker.update function
178 re-using the same parameters:
179 url params:
180 blkio_weight
181 cpu_period, cpu_quota, cpu_shares
182 cpuset_cpus
183 cpuset_mems
184 mem_limit
185 mem_reservation
186 memswap_limit
187 kernel_memory
188 restart_policy
189 see https://docs.docker.com/engine/reference/commandline/update/
190 or API docs: https://docker-py.readthedocs.io/en/stable/api.html#module-docker.api.container
191 :param dc_label: name of the DC
192 :param compute_name: compute container name
193
194 :return: docker inspect dict of deployed docker
195 """
196 global dcs
197
198 def put(self, dc_label, compute_name):
199 logging.debug("REST CALL: update container resources")
200
201 try:
202 c = self._update_resources(dc_label, compute_name)
203 return c.getStatus(), 200, CORS_HEADER
204 except Exception as ex:
205 logging.exception("API error.")
206 return ex.message, 500, CORS_HEADER
207
208 def _update_resources(self, dc_label, compute_name):
209
210 # get URL parameters
211 params = request.args
212 # then no data
213 if params is None:
214 params = {}
215 logging.debug(
216 "REST CALL: update container resources {0}".format(params))
217 # check if container exists
218 d = dcs.get(dc_label).net.getNodeByName(compute_name)
219
220 # general request of cpu percentage
221 # create a mutable copy
222 params = params.to_dict()
223 if 'cpu_bw' in params:
224 cpu_period = int(dcs.get(dc_label).net.cpu_period)
225 value = params.get('cpu_bw')
226 cpu_quota = int(cpu_period * float(value))
227 # put default values back
228 if float(value) <= 0:
229 cpu_period = 100000
230 cpu_quota = -1
231 params['cpu_period'] = cpu_period
232 params['cpu_quota'] = cpu_quota
233 # d.updateCpuLimit(cpu_period=cpu_period, cpu_quota=cpu_quota)
234
235 # only pass allowed keys to docker
236 allowed_keys = ['blkio_weight', 'cpu_period', 'cpu_quota', 'cpu_shares', 'cpuset_cpus',
237 'cpuset_mems', 'mem_limit', 'mem_reservation', 'memswap_limit',
238 'kernel_memory', 'restart_policy']
239 filtered_params = {key: params[key]
240 for key in allowed_keys if key in params}
241
242 d.update_resources(**filtered_params)
243
244 return d
245
246
247 class DatacenterList(Resource):
248 global dcs
249
250 def get(self):
251 logging.debug("API CALL: datacenter list")
252 try:
253 return [d.getStatus() for d in dcs.itervalues()], 200, CORS_HEADER
254 except Exception as ex:
255 logging.exception("API error.")
256 return ex.message, 500, CORS_HEADER
257
258
259 class DatacenterStatus(Resource):
260 global dcs
261
262 def get(self, dc_label):
263 logging.debug("API CALL: datacenter status")
264 try:
265 return dcs.get(dc_label).getStatus(), 200, CORS_HEADER
266 except Exception as ex:
267 logging.exception("API error.")
268 return ex.message, 500, CORS_HEADER