2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
15 logging
.basicConfig(level
=logging
.INFO
)
18 class ZeroRpcApiEndpoint(object):
20 Simple API endpoint that offers a zerorpc-based
21 interface. This interface will be used by the
22 default command line client.
23 It can be used as a reference to implement
24 REST interfaces providing the same semantics,
25 like e.g. OpenStack compute API.
28 def __init__(self
, listenip
, port
):
32 logging
.debug("Created API endpoint %s(%s:%d)" % (
33 self
.__class
__.__name
__, self
.ip
, self
.port
))
35 def connectDatacenter(self
, dc
):
36 self
.dcs
[dc
.label
] = dc
37 logging
.info("Connected DC(%s) to API endpoint %s(%s:%d)" % (
38 dc
.label
, self
.__class
__.__name
__, self
.ip
, self
.port
))
41 thread
= threading
.Thread(target
=self
._api
_server
_thread
, args
=())
44 logging
.debug("Started API endpoint %s(%s:%d)" % (
45 self
.__class
__.__name
__, self
.ip
, self
.port
))
47 def _api_server_thread(self
):
48 s
= zerorpc
.Server(MultiDatacenterApi(self
.dcs
))
49 s
.bind("tcp://%s:%d" % (self
.ip
, self
.port
))
53 class MultiDatacenterApi(object):
55 Just pass through the corresponding request to the
56 selected data center. Do not implement provisioning
57 logic here because will will have multiple API
58 endpoint implementations at the end.
61 def __init__(self
, dcs
):
64 def compute_action_start(self
, dc_label
, compute_name
, image
, network
, command
):
66 Start a new compute instance: A docker container (note: zerorpc does not support keyword arguments)
67 :param dc_label: name of the DC
68 :param compute_name: compute container name
69 :param image: image name
70 :param command: command to execute
71 :param network: list of all interface of the vnf, with their parameters (id=id1,ip=x.x.x.x/x),...
72 :return: networks list({"id":"input","ip": "10.0.0.254/8"}, {"id":"output","ip": "11.0.0.254/24"})
74 # TODO what to return UUID / given name / internal name ?
75 logging
.debug("RPC CALL: compute start")
77 c
= self
.dcs
.get(dc_label
).startCompute(
78 compute_name
, image
=image
, command
=command
, network
=network
)
80 # return docker inspect dict
82 except Exception as ex
:
83 logging
.exception("RPC error.")
86 def compute_action_stop(self
, dc_label
, compute_name
):
87 logging
.debug("RPC CALL: compute stop")
89 return self
.dcs
.get(dc_label
).stopCompute(compute_name
)
90 except Exception as ex
:
91 logging
.exception("RPC error.")
94 def compute_list(self
, dc_label
):
95 logging
.debug("RPC CALL: compute list")
98 # return list with all compute nodes in all DCs
100 for dc
in self
.dcs
.itervalues():
101 all_containers
+= dc
.listCompute()
102 return [(c
.name
, c
.getStatus())
103 for c
in all_containers
]
105 # return list of compute nodes for specified DC
106 return [(c
.name
, c
.getStatus())
107 for c
in self
.dcs
.get(dc_label
).listCompute()]
108 except Exception as ex
:
109 logging
.exception("RPC error.")
112 def compute_status(self
, dc_label
, compute_name
):
113 logging
.debug("RPC CALL: compute status")
116 dc_label
).containers
.get(compute_name
).getStatus()
117 except Exception as ex
:
118 logging
.exception("RPC error.")
122 def compute_profile(self
, dc_label
, compute_name
, kwargs
):
123 # note: zerorpc does not support keyword arguments
125 ## VIM/dummy gatekeeper's tasks:
127 vnf_status
= self
.compute_action_start( dc_label
, compute_name
,
129 kwargs
.get('network'),
130 kwargs
.get('command'))
131 # start traffic source (with fixed ip addres, no use for now...)
132 psrc_status
= self
.compute_action_start( dc_label
, 'psrc', 'profile_source', [{'id':'output'}], None)
133 # start traffic sink (with fixed ip addres)
134 psink_status
= self
.compute_action_start(dc_label
, 'psink', 'profile_sink', [{'id': 'input'}], None)
135 # link vnf to traffic source
136 DCNetwork
= self
.dcs
.get(dc_label
).net
137 DCNetwork
.setChain('psrc', compute_name
,
138 vnf_src_interface
='output',
139 vnf_dst_interface
=kwargs
.get('input'),
140 cmd
='add-flow', weight
=None, bidirectional
=True)
141 DCNetwork
.setChain('psrc', compute_name
,
142 vnf_src_interface
='output',
143 vnf_dst_interface
=kwargs
.get('input'),
144 cmd
='add-flow', weight
=None,
145 match
='dl_type=0x0800,nw_proto=17,udp_dst=5001',
147 DCNetwork
.setChain( compute_name
, 'psink',
148 vnf_src_interface
='output',
149 vnf_dst_interface
=kwargs
.get('input'),
150 cmd
='add-flow', weight
=None, bidirectional
=True)
151 DCNetwork
.setChain(compute_name
, 'psink',
152 vnf_src_interface
='output',
153 vnf_dst_interface
=kwargs
.get('input'),
154 cmd
='add-flow', weight
=None,
155 match
='dl_type=0x0800,nw_proto=17,udp_dst=5001',
159 # start traffic generation
161 for nw in psrc_status.get('network'):
162 if nw.get('intf_name') == 'output':
163 psrc_output_ip = unicode(nw['ip'])
165 dummy_iperf_server_ip = ipaddress.IPv4Address(psrc_output_ip) + 1
167 for nw
in psink_status
.get('network'):
168 if nw
.get('intf_name') == 'input':
169 psink_input_ip
= nw
['ip']
173 # get monitor data and analyze
174 vnf_uuid
= vnf_status
['id']
175 psrc_mgmt_ip
= psrc_status
['docker_network']
179 #need to wait a bit before containers are fully up?
183 for rate
in [0, 1, 2, 3]:
184 #logging.info('query:{0}'.format(query_cpu))
186 output_line
= DCNetwork
.monitor_agent
.profile(psrc_mgmt_ip
, rate
, psink_input_ip
, vnf_uuid
)
195 ## VIM/dummy gatekeeper's tasks:
196 # remove vnfs and chain
197 DCNetwork
.setChain('psrc', compute_name
,
198 vnf_src_interface
='output',
199 vnf_dst_interface
=kwargs
.get('input'),
200 cmd
='del-flows', weight
=None, bidirectional
=True)
201 DCNetwork
.setChain('psrc', compute_name
,
202 vnf_src_interface
='output',
203 vnf_dst_interface
=kwargs
.get('input'),
204 cmd
='del-flows', weight
=None,
205 match
='dl_type=0x0800,nw_proto=17,udp_dst=5001',
207 DCNetwork
.setChain(compute_name
, 'psink',
208 vnf_src_interface
='output',
209 vnf_dst_interface
=kwargs
.get('input'),
210 cmd
='del-flows', weight
=None, bidirectional
=True)
211 DCNetwork
.setChain(compute_name
, 'psink',
212 vnf_src_interface
='output',
213 vnf_dst_interface
=kwargs
.get('input'),
214 cmd
='del-flows', weight
=None,
215 match
='dl_type=0x0800,nw_proto=17,udp_dst=5001',
217 self
.compute_action_stop(dc_label
, compute_name
)
218 self
.compute_action_stop(dc_label
, 'psink')
219 self
.compute_action_stop(dc_label
, 'psrc')
223 def datacenter_list(self
):
224 logging
.debug("RPC CALL: datacenter list")
226 return [d
.getStatus() for d
in self
.dcs
.itervalues()]
227 except Exception as ex
:
228 logging
.exception("RPC error.")
231 def datacenter_status(self
, dc_label
):
232 logging
.debug("RPC CALL: datacenter status")
234 return self
.dcs
.get(dc_label
).getStatus()
235 except Exception as ex
:
236 logging
.exception("RPC error.")