Merge pull request #133 from mpeuster/master
[osm/vim-emu.git] / src / emuvim / api / zerorpc / compute.py
1 """
2 Copyright (c) 2015 SONATA-NFV and Paderborn University
3 ALL RIGHTS RESERVED.
4
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
20 permission.
21
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
27 """
28 """
29 Distributed Cloud Emulator (dcemulator)
30 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
31 """
32
33 import logging
34 import threading
35 import zerorpc
36
37 import paramiko
38 import ipaddress
39 import time
40 import gevent
41
42 logging.basicConfig(level=logging.INFO)
43
44
45 class ZeroRpcApiEndpoint(object):
46 """
47 Simple API endpoint that offers a zerorpc-based
48 interface. This interface will be used by the
49 default command line client.
50 It can be used as a reference to implement
51 REST interfaces providing the same semantics,
52 like e.g. OpenStack compute API.
53 """
54
55 def __init__(self, listenip, port):
56 self.dcs = {}
57 self.ip = listenip
58 self.port = port
59 logging.debug("Created API endpoint %s(%s:%d)" % (
60 self.__class__.__name__, self.ip, self.port))
61
62 def connectDatacenter(self, dc):
63 self.dcs[dc.label] = dc
64 logging.info("Connected DC(%s) to API endpoint %s(%s:%d)" % (
65 dc.label, self.__class__.__name__, self.ip, self.port))
66
67 def start(self):
68 thread = threading.Thread(target=self._api_server_thread, args=())
69 thread.daemon = True
70 thread.start()
71 logging.debug("Started API endpoint %s(%s:%d)" % (
72 self.__class__.__name__, self.ip, self.port))
73
74 def _api_server_thread(self):
75 s = zerorpc.Server(MultiDatacenterApi(self.dcs))
76 s.bind("tcp://%s:%d" % (self.ip, self.port))
77 s.run()
78
79
80 class MultiDatacenterApi(object):
81 """
82 Just pass through the corresponding request to the
83 selected data center. Do not implement provisioning
84 logic here because will will have multiple API
85 endpoint implementations at the end.
86 """
87
88 def __init__(self, dcs):
89 self.dcs = dcs
90
91 def compute_action_start(self, dc_label, compute_name, image, network, command):
92 """
93 Start a new compute instance: A docker container (note: zerorpc does not support keyword arguments)
94 :param dc_label: name of the DC
95 :param compute_name: compute container name
96 :param image: image name
97 :param command: command to execute
98 :param network: list of all interface of the vnf, with their parameters (id=id1,ip=x.x.x.x/x),...
99 :return: networks list({"id":"input","ip": "10.0.0.254/8"}, {"id":"output","ip": "11.0.0.254/24"})
100 """
101 # TODO what to return UUID / given name / internal name ?
102 logging.debug("RPC CALL: compute start")
103 try:
104 c = self.dcs.get(dc_label).startCompute(
105 compute_name, image=image, command=command, network=network)
106 #return str(c.name)
107 # return docker inspect dict
108 return c.getStatus()
109 except Exception as ex:
110 logging.exception("RPC error.")
111 return ex.message
112
113 def compute_action_stop(self, dc_label, compute_name):
114 logging.debug("RPC CALL: compute stop")
115 try:
116 return self.dcs.get(dc_label).stopCompute(compute_name)
117 except Exception as ex:
118 logging.exception("RPC error.")
119 return ex.message
120
121 def compute_list(self, dc_label):
122 logging.debug("RPC CALL: compute list")
123 try:
124 if dc_label is None:
125 # return list with all compute nodes in all DCs
126 all_containers = []
127 for dc in self.dcs.itervalues():
128 all_containers += dc.listCompute()
129 return [(c.name, c.getStatus())
130 for c in all_containers]
131 else:
132 # return list of compute nodes for specified DC
133 return [(c.name, c.getStatus())
134 for c in self.dcs.get(dc_label).listCompute()]
135 except Exception as ex:
136 logging.exception("RPC error.")
137 return ex.message
138
139 def compute_status(self, dc_label, compute_name):
140 logging.debug("RPC CALL: compute status")
141 try:
142 return self.dcs.get(
143 dc_label).containers.get(compute_name).getStatus()
144 except Exception as ex:
145 logging.exception("RPC error.")
146 return ex.message
147
148 @zerorpc.stream
149 def compute_profile(self, dc_label, compute_name, kwargs):
150 # note: zerorpc does not support keyword arguments
151
152 ## VIM/dummy gatekeeper's tasks:
153 # start vnf
154 vnf_status = self.compute_action_start( dc_label, compute_name,
155 kwargs.get('image'),
156 kwargs.get('network'),
157 kwargs.get('command'))
158 # start traffic source (with fixed ip addres, no use for now...)
159 psrc_status = self.compute_action_start( dc_label, 'psrc', 'profile_source', [{'id':'output'}], None)
160 # start traffic sink (with fixed ip addres)
161 psink_status = self.compute_action_start(dc_label, 'psink', 'profile_sink', [{'id': 'input'}], None)
162 # link vnf to traffic source
163 DCNetwork = self.dcs.get(dc_label).net
164 DCNetwork.setChain('psrc', compute_name,
165 vnf_src_interface='output',
166 vnf_dst_interface=kwargs.get('input'),
167 cmd='add-flow', weight=None, bidirectional=True)
168 DCNetwork.setChain('psrc', compute_name,
169 vnf_src_interface='output',
170 vnf_dst_interface=kwargs.get('input'),
171 cmd='add-flow', weight=None,
172 match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
173 cookie=10)
174 DCNetwork.setChain( compute_name, 'psink',
175 vnf_src_interface='output',
176 vnf_dst_interface=kwargs.get('input'),
177 cmd='add-flow', weight=None, bidirectional=True)
178 DCNetwork.setChain(compute_name, 'psink',
179 vnf_src_interface='output',
180 vnf_dst_interface=kwargs.get('input'),
181 cmd='add-flow', weight=None,
182 match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
183 cookie=11)
184
185 ## SSM/SP tasks:
186 # start traffic generation
187 '''
188 for nw in psrc_status.get('network'):
189 if nw.get('intf_name') == 'output':
190 psrc_output_ip = unicode(nw['ip'])
191 break
192 dummy_iperf_server_ip = ipaddress.IPv4Address(psrc_output_ip) + 1
193 '''
194 for nw in psink_status.get('network'):
195 if nw.get('intf_name') == 'input':
196 psink_input_ip = nw['ip']
197 break
198
199
200 # get monitor data and analyze
201 vnf_uuid = vnf_status['id']
202 psrc_mgmt_ip = psrc_status['docker_network']
203
204 # query rate
205
206 #need to wait a bit before containers are fully up?
207 time.sleep(2)
208
209 def generate():
210 for rate in [0, 1, 2, 3]:
211 #logging.info('query:{0}'.format(query_cpu))
212
213 output_line = DCNetwork.monitor_agent.profile(psrc_mgmt_ip, rate, psink_input_ip, vnf_uuid)
214 gevent.sleep(0)
215 yield output_line
216
217 # query loss
218
219
220 # create table
221
222 ## VIM/dummy gatekeeper's tasks:
223 # remove vnfs and chain
224 DCNetwork.setChain('psrc', compute_name,
225 vnf_src_interface='output',
226 vnf_dst_interface=kwargs.get('input'),
227 cmd='del-flows', weight=None, bidirectional=True)
228 DCNetwork.setChain('psrc', compute_name,
229 vnf_src_interface='output',
230 vnf_dst_interface=kwargs.get('input'),
231 cmd='del-flows', weight=None,
232 match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
233 cookie=10)
234 DCNetwork.setChain(compute_name, 'psink',
235 vnf_src_interface='output',
236 vnf_dst_interface=kwargs.get('input'),
237 cmd='del-flows', weight=None, bidirectional=True)
238 DCNetwork.setChain(compute_name, 'psink',
239 vnf_src_interface='output',
240 vnf_dst_interface=kwargs.get('input'),
241 cmd='del-flows', weight=None,
242 match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
243 cookie=11)
244 self.compute_action_stop(dc_label, compute_name)
245 self.compute_action_stop(dc_label, 'psink')
246 self.compute_action_stop(dc_label, 'psrc')
247
248 return generate()
249
250 def datacenter_list(self):
251 logging.debug("RPC CALL: datacenter list")
252 try:
253 return [d.getStatus() for d in self.dcs.itervalues()]
254 except Exception as ex:
255 logging.exception("RPC error.")
256 return ex.message
257
258 def datacenter_status(self, dc_label):
259 logging.debug("RPC CALL: datacenter status")
260 try:
261 return self.dcs.get(dc_label).getStatus()
262 except Exception as ex:
263 logging.exception("RPC error.")
264 return ex.message
265
266