RUN apt-get update && \
DEBIAN_FRONTEND=noninteractive apt-get -y install git make python python-pip debhelper python3 python3-all python3-pip python3-setuptools && \
- DEBIAN_FRONTEND=noninteractive apt-get -y install wget tox apt-utils && \
+ DEBIAN_FRONTEND=noninteractive apt-get -y install wget tox apt-utils flake8 python-nose python-mock && \
DEBIAN_FRONTEND=noninteractive pip install pip==9.0.3 && \
DEBIAN_FRONTEND=noninteractive pip3 install pip==9.0.3 && \
DEBIAN_FRONTEND=noninteractive pip install -U setuptools setuptools-version-command stdeb && \
#!/bin/sh
-echo "UNITTEST"
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+flake8 osm_ro/wim osm_ro/vim_thread.py --max-line-length 120 \
+ --exclude .svn,CVS,.gz,.git,__pycache__,.tox,local,temp,osm_im --ignore W291,W293,E226,E402,W504
+
from itertools import groupby
from operator import itemgetter
from sys import exc_info
-from time import time
+# from time import time
from uuid import uuid1 as generate_uuid
from six import reraise
'LIMIT {:d},{:d}').format(
self.safe_str(wim_account_id),
','.join(type_options),
- group_offset, group_limit
- )
+ group_offset, group_limit)
join = 'vim_wim_actions NATURAL JOIN ({}) AS items'.format(items)
db_results = self.db.get_rows(
'task_index': task_index}
try:
action = self.query_one('vim_wim_actions', WHERE=condition)
- except:
+ except Exception:
actions = self.query('vim_wim_actions', WHERE=condition)
self.logger.error('More then one action found:\n%s',
json.dumps(actions, indent=4))
updates = preprocess_record(
merge_dicts(action, properties, extra=extra))
- num_changes = self.db.update_rows('vim_wim_actions',
- UPDATE=updates, WHERE=condition)
+ num_changes = self.db.update_rows('vim_wim_actions', UPDATE=updates, WHERE=condition)
if num_changes is None:
raise UnexpectedDatabaseError(
if not changes:
return 0
- return self.db.update_rows('instance_actions',
- WHERE={'uuid': uuid}, UPDATE=changes)
+ return self.db.update_rows('instance_actions', WHERE={'uuid': uuid}, UPDATE=changes)
def get_only_vm_with_external_net(self, instance_net_id, **kwargs):
"""Return an instance VM if that is the only VM connected to an
def datacenter_tenant_association(datacenter, tenant):
return {'nfvo_tenant_id': uuid('tenant%d' % tenant),
- 'datacenter_id': uuid('dc%d' % datacenter),
- 'datacenter_tenant_id':
- uuid('dc-account%d%d' % (tenant, datacenter))}
+ 'datacenter_id': uuid('dc%d' % datacenter),
+ 'datacenter_tenant_id': uuid('dc-account%d%d' % (tenant, datacenter))}
def datacenter_set(identifier=0, tenant=0):
from six import reraise
from six.moves import queue
-from . import wan_link_actions, wimconn_odl, wimconn_dynpac # wimconn_tapi
+from . import wan_link_actions, wimconn_odl, wimconn_dynpac # wimconn_tapi
from ..utils import ensure, partition, pipe
from .actions import IGNORE, PENDING, REFRESH
from .errors import (
selected_ports = []
for connection_point in connection_points:
endpoint_id = connection_point.get(self.__SERVICE_ENDPOINT_PARAM)
- port = filter(lambda x: x.get(self.__WAN_SERVICE_ENDPOINT_PARAM)
- == endpoint_id, port_mapping)[0]
+ port = filter(lambda x: x.get(self.__WAN_SERVICE_ENDPOINT_PARAM) == endpoint_id, port_mapping)[0]
wsmpi_json = port.get(self.__WAN_MAPPING_INFO_PARAM)
port_info = json.loads(wsmpi_json)
selected_ports.append(port_info)
}, {
"wan_switch_dpid": selected_ports[1].get(self.__SW_ID_PARAM),
"wan_switch_port": selected_ports[1].get(self.__SW_PORT_PARAM),
- "wan_vlan": connection_points[1].get(self.__ENCAPSULATION_INFO_PARAM).get(self.__VLAN_PARAM)
+ "wan_vlan": connection_points[1].get(self.__ENCAPSULATION_INFO_PARAM).get(self.__VLAN_PARAM)
}],
"bandwidth": kwargs.get(self.__BANDWIDTH_PARAM),
"service_type": service_type,
[testenv:flake8]
basepython = python
deps = flake8
-commands =
- flake8 setup.py
+# TODO for the moment few files are tested.
+commands = flake8 osm_ro/wim --max-line-length 120 \
+ --exclude .svn,CVS,.gz,.git,__pycache__,.tox,local,temp,osm_im --ignore W291,W293,E226,E402,W504
[testenv:build]
basepython = python
deps = stdeb
setuptools-version-command
commands = python setup.py --command-packages=stdeb.command bdist_deb
-
+