2 # -*- coding: utf-8 -*-
4 # Copyright 2020 ArctosLabs Scandinavia AB
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
21 from pathlib
import Path
24 from osm_common
import dbmemory
, dbmongo
, msglocal
, msgkafka
26 from osm_pla
.config
.config
import Config
27 from osm_pla
.placement
.mznplacement
import MznPlacementConductor
28 from osm_pla
.placement
.mznplacement
import NsPlacementDataFactory
32 pil_price_list_file
= Path('/placement/pil_price_list.yaml')
33 vnf_price_list_file
= Path('/placement/vnf_price_list.yaml')
35 def __init__(self
, config
: Config
, loop
=None):
36 self
.log
= logging
.getLogger("pla.server")
40 self
.loop
= loop
or asyncio
.get_event_loop()
43 if config
.get('database', 'driver') == "mongo":
44 self
.db
= dbmongo
.DbMongo()
45 self
.db
.db_connect(config
.get('database'))
46 elif config
.get('database', 'driver') == "memory":
47 self
.db
= dbmemory
.DbMemory()
48 self
.db
.db_connect(config
.get('database'))
50 raise Exception("Invalid configuration param '{}' at '[database]':'driver'".format(
51 config
.get('database', 'driver')))
53 if config
.get('message', 'driver') == "local":
54 self
.msgBus
= msglocal
.MsgLocal()
55 elif config
.get('message', 'driver') == "kafka":
56 self
.msgBus
= msgkafka
.MsgKafka()
58 raise Exception("Invalid message bus driver {}".format(
59 config
.get('message', 'driver')))
60 self
.msgBus
.loop
= loop
61 self
.msgBus
.connect(config
.get('message'))
63 except Exception as e
:
64 self
.log
.exception("kafka setup error. Exception: {}".format(e
))
66 def _get_nslcmop(self
, nsdlcmop_id
):
69 :return: nslcmop from database corresponding to nslcmop_id
71 db_filter
= {"_id": nsdlcmop_id
}
72 nslcmop
= self
.db
.get_one("nslcmops", db_filter
)
75 def _get_projects(self
):
77 :return: project name to project id mapping
79 projects
= self
.db
.get_list("projects")
80 return {project
['_id']: project
['name'] for project
in projects
}
82 def _get_nsd(self
, nsd_id
):
85 :return: nsd from database corresponding to nsd_id
87 db_filter
= {"_id": nsd_id
}
88 return self
.db
.get_one("nsds", db_filter
)
90 def _get_vim_accounts(self
, vim_account_ids
):
92 :param vim_account_ids: list of VIM account ids
93 :return: list of vim account entries from database corresponding to list in vim_accounts_id
95 db_filter
= {"_id": vim_account_ids
}
96 return self
.db
.get_list("vim_accounts", db_filter
)
98 def _read_vnf_price_list(self
, price_list_file_path
):
100 read vnf price list configuration file
101 :param price_list_file_path:
104 with
open(str(price_list_file_path
)) as pl_fd
:
105 price_list
= yaml
.safe_load_all(pl_fd
)
106 return next(price_list
)
108 def _price_list_with_project(self
, price_list
):
110 Figure out if this price list is with project or not.
111 Note: to handle the unlikely event that a project is called 'prices' we do not simply check if 'prices'
112 is in the dict keys for a price list sequence but rather go down one step in the nesting
113 in which we either have
114 1) 'prices:{vim_url:...}' if prices are also per project, or
115 2) '{vim_url:...}' if prices are only per vim
118 :return: True if project part of price list, else False
120 price_list_entry_keys
= set(price_list
[0].keys())
121 price_list_entry_keys
.remove('vnfd')
122 pl_key
= price_list_entry_keys
.pop()
123 entry_to_check
= price_list
[0][pl_key
][0].keys()
124 return True if 'prices' in entry_to_check
else False
126 def _get_vnf_price_list(self
, price_list_file_path
, project_name
=None):
128 read vnf price list configuration file, determine its type and reformat content accordingly
130 :param price_list_file_path:
132 :return: dictionary formatted as {'<vnfd>': {'<vim-url>':'<price>'}}
134 price_list_data
= self
._read
_vnf
_price
_list
(price_list_file_path
)
135 if self
._price
_list
_with
_project
(price_list_data
):
137 for i
in price_list_data
:
138 price_data
= i
[project_name
] if type(i
[project_name
]) is dict else i
[project_name
][0]
139 res_component
= {i
['vim_name']: i
['price'] for i
in price_data
['prices']}
140 res
.update({i
['vnfd']: res_component
})
143 return {i
['vnfd']: {i1
['vim_name']: i1
['price'] for i1
in i
['prices']} for i
in price_list_data
}
145 def _get_pil_info(self
, pil_info_file_path
):
147 read and return pil information from file
148 :param pil_info_file_path: Path to pil_info file
149 :return pil configuration file content as Python object
151 with
open(str(pil_info_file_path
)) as pil_fd
:
152 data
= yaml
.safe_load_all(pil_fd
)
155 async def get_placement(self
, nslcmop_id
):
157 - Collects and prepares placement information.
158 - Request placement computation.
159 - Formats and distribute placement result
161 Note: exceptions result in empty response message
167 nslcmop
= self
._get
_nslcmop
(nslcmop_id
)
168 nsd
= self
._get
_nsd
(nslcmop
['operationParams']['nsdId'])
169 self
.log
.info("nsd: {}".format(nsd
))
170 projects
= self
._get
_projects
()
171 self
.log
.info("projects: {}".format(projects
))
172 nslcmop_project
= nslcmop
['_admin']['projects_read'][0]
173 self
.log
.info("nslcmop_project: {}".format(nslcmop_project
))
174 valid_vim_accounts
= nslcmop
['operationParams']['validVimAccounts']
175 vim_accounts_data
= self
._get
_vim
_accounts
(valid_vim_accounts
)
176 vims_information
= {_
['name']: _
['_id'] for _
in vim_accounts_data
}
177 price_list
= self
._get
_vnf
_price
_list
(Server
.vnf_price_list_file
, projects
[nslcmop_project
])
178 pil_info
= self
._get
_pil
_info
(Server
.pil_price_list_file
)
179 pinning
= nslcmop
['operationParams'].get('vnf')
180 self
.log
.info("pinning: {}".format(pinning
))
181 order_constraints
= nslcmop
['operationParams'].get('placement-constraints')
182 self
.log
.info("order constraints: {}".format(order_constraints
))
184 nspd
= NsPlacementDataFactory(vims_information
,
188 pinning
, order_constraints
).create_ns_placement_data()
190 vnf_placement
= MznPlacementConductor(self
.log
).do_placement_computation(nspd
)
192 except Exception as e
:
193 # Note: there is no cure for failure so we have a catch-all clause here
194 self
.log
.exception("PLA fault. Exception: {}".format(e
))
197 await self
.msgBus
.aiowrite("pla", "placement",
198 {'placement': {'vnf': vnf_placement
, 'nslcmopId': nslcmop_id
}})
200 def handle_kafka_command(self
, topic
, command
, params
):
201 self
.log
.info("Kafka msg arrived: {} {} {}".format(topic
, command
, params
))
202 if topic
== "pla" and command
== "get_placement":
203 nslcmop_id
= params
.get('nslcmopId')
204 self
.loop
.create_task(self
.get_placement(nslcmop_id
))
206 async def kafka_read(self
):
207 self
.log
.info("Task kafka_read start")
211 await self
.msgBus
.aioread(topics
, self
.loop
, self
.handle_kafka_command
)
212 except Exception as e
:
213 self
.log
.error("kafka read error. Exception: {}".format(e
))
214 await asyncio
.sleep(5, loop
=self
.loop
)
217 self
.loop
.run_until_complete(self
.kafka_read())
221 self
.msgBus
.disconnect()