2 Copyright (c) 2015 SONATA-NFV
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
29 Distributed Cloud Emulator (dcemulator)
37 logging
.basicConfig(level
=logging
.INFO
)
40 class ZeroRpcApiEndpointDCNetwork(object):
42 Simple API endpoint that offers a zerorpc-based
43 interface. This interface will be used by the
44 default command line client.
45 It can be used as a reference to implement
46 REST interfaces providing the same semantics,
47 like e.g. OpenStack compute API.
50 def __init__(self
, listenip
, port
, DCNetwork
=None):
52 self
.connectDCNetwork(DCNetwork
)
55 logging
.debug("Created monitoring API endpoint %s(%s:%d)" % (
56 self
.__class
__.__name
__, self
.ip
, self
.port
))
58 def connectDCNetwork(self
, net
):
60 logging
.info("Connected DCNetwork to API endpoint %s(%s:%d)" % (
61 self
.__class
__.__name
__, self
.ip
, self
.port
))
64 thread
= threading
.Thread(target
=self
._api
_server
_thread
, args
=())
67 logging
.debug("Started API endpoint %s(%s:%d)" % (
68 self
.__class
__.__name
__, self
.ip
, self
.port
))
70 def _api_server_thread(self
):
71 s
= zerorpc
.Server(DCNetworkApi(self
.net
))
72 s
.bind("tcp://%s:%d" % (self
.ip
, self
.port
))
76 logging
.info("Stop the monitoring API endpoint")
80 class DCNetworkApi(object):
82 The networking and monitoring commands need the scope of the
83 whole DC network to find the requested vnf. So this API is intended
84 to work with a DCNetwork.
85 Just pass through the corresponding request to the
86 selected data center network. Do not implement provisioning
87 logic here because will will have multiple API
88 endpoint implementations at the end.
91 def __init__(self
, net
):
94 def network_action_start(self
, vnf_src_name
, vnf_dst_name
, kwargs
):
95 # call DCNetwork method, not really datacenter specific API for now...
96 # provided dc name needs to be part of API endpoint
97 # no check if vnfs are really connected to this datacenter...
98 logging
.debug("RPC CALL: network chain start")
100 c
= self
.net
.setChain(
101 vnf_src_name
, vnf_dst_name
,
102 vnf_src_interface
=kwargs
.get('vnf_src_interface'),
103 vnf_dst_interface
=kwargs
.get('vnf_dst_interface'),
105 weight
=kwargs
.get('weight'),
106 match
=kwargs
.get('match'),
107 bidirectional
=kwargs
.get('bidirectional'),
108 cookie
=kwargs
.get('cookie'))
110 except Exception as ex
:
111 logging
.exception("RPC error.")
114 def network_action_stop(self
, vnf_src_name
, vnf_dst_name
, kwargs
):
115 # call DCNetwork method, not really datacenter specific API for now...
116 # provided dc name needs to be part of API endpoint
117 # no check if vnfs are really connected to this datacenter...
118 logging
.debug("RPC CALL: network chain stop")
120 c
= self
.net
.setChain(
121 vnf_src_name
, vnf_dst_name
,
122 vnf_src_interface
=kwargs
.get('vnf_src_interface'),
123 vnf_dst_interface
=kwargs
.get('vnf_dst_interface'),
125 weight
=kwargs
.get('weight'),
126 match
=kwargs
.get('match'),
127 bidirectional
=kwargs
.get('bidirectional'),
128 cookie
=kwargs
.get('cookie'))
130 except Exception as ex
:
131 logging
.exception("RPC error.")
134 # setup the rate measurement for a vnf interface
135 def setup_metric(self
, vnf_name
, vnf_interface
, metric
):
136 logging
.debug("RPC CALL: setup metric")
138 c
= self
.net
.monitor_agent
.setup_metric(vnf_name
, vnf_interface
, metric
)
140 except Exception as ex
:
141 logging
.exception("RPC error.")
144 # remove the rate measurement for a vnf interface
145 def stop_metric(self
, vnf_name
, vnf_interface
, metric
):
146 logging
.debug("RPC CALL: stop metric")
148 c
= self
.net
.monitor_agent
.stop_metric(vnf_name
, vnf_interface
, metric
)
150 except Exception as ex
:
151 logging
.exception("RPC error.")
154 # setup the flow metrics measurement
155 def setup_flow(self
, vnf_name
, vnf_interface
, metric
, cookie
):
156 logging
.debug("RPC CALL: setup flow")
158 c
= self
.net
.monitor_agent
.setup_flow(vnf_name
, vnf_interface
, metric
, cookie
)
160 except Exception as ex
:
161 logging
.exception("RPC error.")
164 # remove the flow metrics measurement
165 def stop_flow(self
, vnf_name
, vnf_interface
, metric
, cookie
):
166 logging
.debug("RPC CALL: stop flow")
168 c
= self
.net
.monitor_agent
.stop_flow(vnf_name
, vnf_interface
, metric
, cookie
)
170 except Exception as ex
:
171 logging
.exception("RPC error.")
174 # do prometheus query
175 def prometheus(self
, dc_label
, vnf_name
, vnf_interface
, query
):
176 logging
.debug("RPC CALL: query prometheus")
177 vnf_status
= self
.net
.dcs
.get(dc_label
).containers
.get(vnf_name
).getStatus()
178 uuid
= vnf_status
['id']
179 query
= query
.replace('<uuid>', uuid
)
180 #if needed, replace interface id with emu-intfs name
181 # query = query.replace('<intf>', vnf_interface)
182 logging
.info('query: {0}'.format(query
))
184 c
= self
.net
.monitor_agent
.query_Prometheus(query
)
186 except Exception as ex
:
187 logging
.exception("RPC error.")