merge master and fix SDN chaining unit test
[osm/vim-emu.git] / src / emuvim / api / zerorpc / compute.py
1 """
2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
4 """
5
6 import logging
7 import threading
8 import zerorpc
9
10 import paramiko
11 import ipaddress
12 import time
13 import gevent
14
15 logging.basicConfig(level=logging.INFO)
16
17
18 class ZeroRpcApiEndpoint(object):
19 """
20 Simple API endpoint that offers a zerorpc-based
21 interface. This interface will be used by the
22 default command line client.
23 It can be used as a reference to implement
24 REST interfaces providing the same semantics,
25 like e.g. OpenStack compute API.
26 """
27
28 def __init__(self, listenip, port):
29 self.dcs = {}
30 self.ip = listenip
31 self.port = port
32 logging.debug("Created API endpoint %s(%s:%d)" % (
33 self.__class__.__name__, self.ip, self.port))
34
35 def connectDatacenter(self, dc):
36 self.dcs[dc.label] = dc
37 logging.info("Connected DC(%s) to API endpoint %s(%s:%d)" % (
38 dc.label, self.__class__.__name__, self.ip, self.port))
39
40 def start(self):
41 thread = threading.Thread(target=self._api_server_thread, args=())
42 thread.daemon = True
43 thread.start()
44 logging.debug("Started API endpoint %s(%s:%d)" % (
45 self.__class__.__name__, self.ip, self.port))
46
47 def _api_server_thread(self):
48 s = zerorpc.Server(MultiDatacenterApi(self.dcs))
49 s.bind("tcp://%s:%d" % (self.ip, self.port))
50 s.run()
51
52
53 class MultiDatacenterApi(object):
54 """
55 Just pass through the corresponding request to the
56 selected data center. Do not implement provisioning
57 logic here because will will have multiple API
58 endpoint implementations at the end.
59 """
60
61 def __init__(self, dcs):
62 self.dcs = dcs
63
64 def compute_action_start(self, dc_label, compute_name, image, network, command):
65 """
66 Start a new compute instance: A docker container (note: zerorpc does not support keyword arguments)
67 :param dc_label: name of the DC
68 :param compute_name: compute container name
69 :param image: image name
70 :param command: command to execute
71 :param network: list of all interface of the vnf, with their parameters (id=id1,ip=x.x.x.x/x),...
72 :return: networks list({"id":"input","ip": "10.0.0.254/8"}, {"id":"output","ip": "11.0.0.254/24"})
73 """
74 # TODO what to return UUID / given name / internal name ?
75 logging.debug("RPC CALL: compute start")
76 try:
77 c = self.dcs.get(dc_label).startCompute(
78 compute_name, image=image, command=command, network=network)
79 #return str(c.name)
80 # return docker inspect dict
81 return c.getStatus()
82 except Exception as ex:
83 logging.exception("RPC error.")
84 return ex.message
85
86 def compute_action_stop(self, dc_label, compute_name):
87 logging.debug("RPC CALL: compute stop")
88 try:
89 return self.dcs.get(dc_label).stopCompute(compute_name)
90 except Exception as ex:
91 logging.exception("RPC error.")
92 return ex.message
93
94 def compute_list(self, dc_label):
95 logging.debug("RPC CALL: compute list")
96 try:
97 if dc_label is None:
98 # return list with all compute nodes in all DCs
99 all_containers = []
100 for dc in self.dcs.itervalues():
101 all_containers += dc.listCompute()
102 return [(c.name, c.getStatus())
103 for c in all_containers]
104 else:
105 # return list of compute nodes for specified DC
106 return [(c.name, c.getStatus())
107 for c in self.dcs.get(dc_label).listCompute()]
108 except Exception as ex:
109 logging.exception("RPC error.")
110 return ex.message
111
112 def compute_status(self, dc_label, compute_name):
113 logging.debug("RPC CALL: compute status")
114 try:
115 return self.dcs.get(
116 dc_label).containers.get(compute_name).getStatus()
117 except Exception as ex:
118 logging.exception("RPC error.")
119 return ex.message
120
121 @zerorpc.stream
122 def compute_profile(self, dc_label, compute_name, image, kwargs):
123 # note: zerorpc does not support keyword arguments
124
125 ## VIM/dummy gatekeeper's tasks:
126 # start vnf
127 vnf_status = self.compute_action_start( dc_label, compute_name, image,
128 kwargs.get('network'),
129 kwargs.get('command'))
130 # start traffic source (with fixed ip addres, no use for now...)
131 psrc_status = self.compute_action_start( dc_label, 'psrc', 'profile_source', [{'id':'output'}], None)
132 # start traffic sink (with fixed ip addres)
133 psink_status = self.compute_action_start(dc_label, 'psink', 'profile_sink', [{'id': 'input'}], None)
134 # link vnf to traffic source
135 DCNetwork = self.dcs.get(dc_label).net
136 DCNetwork.setChain('psrc', compute_name,
137 vnf_src_interface='output',
138 vnf_dst_interface=kwargs.get('input'),
139 cmd='add-flow', weight=None, bidirectional=True)
140 DCNetwork.setChain('psrc', compute_name,
141 vnf_src_interface='output',
142 vnf_dst_interface=kwargs.get('input'),
143 cmd='add-flow', weight=None,
144 match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
145 cookie=10)
146 DCNetwork.setChain( compute_name, 'psink',
147 vnf_src_interface='output',
148 vnf_dst_interface=kwargs.get('input'),
149 cmd='add-flow', weight=None, bidirectional=True)
150 DCNetwork.setChain(compute_name, 'psink',
151 vnf_src_interface='output',
152 vnf_dst_interface=kwargs.get('input'),
153 cmd='add-flow', weight=None,
154 match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
155 cookie=11)
156
157 ## SSM/SP tasks:
158 # start traffic generation
159 '''
160 for nw in psrc_status.get('network'):
161 if nw.get('intf_name') == 'output':
162 psrc_output_ip = unicode(nw['ip'])
163 break
164 dummy_iperf_server_ip = ipaddress.IPv4Address(psrc_output_ip) + 1
165 '''
166 for nw in psink_status.get('network'):
167 if nw.get('intf_name') == 'input':
168 psink_input_ip = nw['ip']
169 break
170
171
172 # get monitor data and analyze
173 vnf_uuid = vnf_status['id']
174 psrc_mgmt_ip = psrc_status['docker_network']
175
176 # query rate
177
178 #need to wait a bit before containers are fully up?
179 time.sleep(2)
180
181 def generate():
182 for rate in [0, 1, 2, 3]:
183 #logging.info('query:{0}'.format(query_cpu))
184
185 output_line = DCNetwork.monitor_agent.profile(psrc_mgmt_ip, rate, psink_input_ip, vnf_uuid)
186 gevent.sleep(0)
187 yield output_line
188
189 # query loss
190
191
192 # create table
193
194 ## VIM/dummy gatekeeper's tasks:
195 # remove vnfs and chain
196 DCNetwork.setChain('psrc', compute_name,
197 vnf_src_interface='output',
198 vnf_dst_interface=kwargs.get('input'),
199 cmd='del-flows', weight=None, bidirectional=True)
200 DCNetwork.setChain('psrc', compute_name,
201 vnf_src_interface='output',
202 vnf_dst_interface=kwargs.get('input'),
203 cmd='del-flows', weight=None,
204 match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
205 cookie=10)
206 DCNetwork.setChain(compute_name, 'psink',
207 vnf_src_interface='output',
208 vnf_dst_interface=kwargs.get('input'),
209 cmd='del-flows', weight=None, bidirectional=True)
210 DCNetwork.setChain(compute_name, 'psink',
211 vnf_src_interface='output',
212 vnf_dst_interface=kwargs.get('input'),
213 cmd='del-flows', weight=None,
214 match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
215 cookie=11)
216 self.compute_action_stop(dc_label, compute_name)
217 self.compute_action_stop(dc_label, 'psink')
218 self.compute_action_stop(dc_label, 'psrc')
219
220 return generate()
221
222 def datacenter_list(self):
223 logging.debug("RPC CALL: datacenter list")
224 try:
225 return [d.getStatus() for d in self.dcs.itervalues()]
226 except Exception as ex:
227 logging.exception("RPC error.")
228 return ex.message
229
230 def datacenter_status(self, dc_label):
231 logging.debug("RPC CALL: datacenter status")
232 try:
233 return self.dcs.get(dc_label).getStatus()
234 except Exception as ex:
235 logging.exception("RPC error.")
236 return ex.message
237
238