2 # -*- coding: utf-8 -*-
4 # Copyright 2020 ArctosLabs Scandinavia AB
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
22 from pathlib
import Path
24 # import pkg_resources
26 from osm_common
import dbmemory
, dbmongo
, msglocal
, msgkafka
28 from osm_pla
.config
.config
import Config
29 from osm_pla
.placement
.mznplacement
import MznPlacementConductor
30 from osm_pla
.placement
.mznplacement
import NsPlacementDataFactory
34 pil_price_list_file
= Path('/placement/pil_price_list.yaml')
35 vnf_price_list_file
= Path('/placement/vnf_price_list.yaml')
37 def __init__(self
, config
: Config
, loop
=None):
38 self
.log
= logging
.getLogger("pla.server")
42 self
.loop
= loop
or asyncio
.get_event_loop()
45 if config
.get('database', 'driver') == "mongo":
46 self
.db
= dbmongo
.DbMongo()
47 self
.db
.db_connect(config
.get('database'))
48 elif config
.get('database', 'driver') == "memory":
49 self
.db
= dbmemory
.DbMemory()
50 self
.db
.db_connect(config
.get('database'))
52 raise Exception("Invalid configuration param '{}' at '[database]':'driver'".format(
53 config
.get('database', 'driver')))
55 if config
.get('message', 'driver') == "local":
56 self
.msgBus
= msglocal
.MsgLocal()
57 elif config
.get('message', 'driver') == "kafka":
58 self
.msgBus
= msgkafka
.MsgKafka()
60 raise Exception("Invalid message bus driver {}".format(
61 config
.get('message', 'driver')))
62 self
.msgBus
.loop
= loop
63 self
.msgBus
.connect(config
.get('message'))
65 except Exception as e
:
66 self
.log
.exception("kafka setup error. Exception: {}".format(e
))
68 def _get_nslcmop(self
, nsdlcmop_id
):
71 :return: nslcmop from database corresponding to nslcmop_id
73 db_filter
= {"_id": nsdlcmop_id
}
74 nslcmop
= self
.db
.get_one("nslcmops", db_filter
)
77 def _get_nsd(self
, nsd_id
):
80 :return: nsd from database corresponding to nsd_id
82 db_filter
= {"_id": nsd_id
}
83 return self
.db
.get_one("nsds", db_filter
)
85 def _get_vim_accounts(self
, vim_account_ids
):
87 :param vim_account_ids: list of VIM account ids
88 :return: list of vim account entries from database corresponding to list in vim_accounts_id
90 db_filter
= {"_id": vim_account_ids
}
91 return self
.db
.get_list("vim_accounts", db_filter
)
93 def _get_vnf_price_list(self
, price_list_file_path
):
95 read vnf price list configuration file and reformat its content
97 :param: price_list_file: Path to price list file
98 :return: dictionary formatted as {'<vnfd>': {'<vim-url>':'<price>'}}
100 with
open(str(price_list_file_path
)) as pl_fd
:
101 price_list_data
= yaml
.safe_load_all(pl_fd
)
102 return {i
['vnfd']: {i1
['vim_url']: i1
['price'] for i1
in i
['prices']} for i
in next(price_list_data
)}
104 def _get_pil_info(self
, pil_info_file_path
):
106 read and return pil information from file
107 :param pil_info_file_path: Path to pil_info file
108 :return pil configuration file content as Python object
110 with
open(str(pil_info_file_path
)) as pil_fd
:
111 data
= yaml
.safe_load_all(pil_fd
)
114 async def get_placement(self
, nslcmop_id
):
116 - Collects and prepares placement information.
117 - Request placement computation.
118 - Formats and distribute placement result
120 Note: exceptions result in empty response message
126 nslcmop
= self
._get
_nslcmop
(nslcmop_id
)
127 nsd
= self
._get
_nsd
(nslcmop
['operationParams']['nsdId'])
128 self
.log
.info("nsd: {}".format(nsd
))
129 valid_vim_accounts
= nslcmop
['operationParams']['validVimAccounts']
130 vim_accounts_data
= self
._get
_vim
_accounts
(valid_vim_accounts
)
131 vims_information
= {_
['vim_url']: _
['_id'] for _
in vim_accounts_data
}
132 price_list
= self
._get
_vnf
_price
_list
(Server
.vnf_price_list_file
)
133 pil_info
= self
._get
_pil
_info
(Server
.pil_price_list_file
)
134 pinning
= nslcmop
['operationParams'].get('vnf')
135 self
.log
.info("pinning: {}".format(pinning
))
136 order_constraints
= nslcmop
['operationParams'].get('placement-constraints')
137 self
.log
.info("order constraints: {}".format(order_constraints
))
139 nspd
= NsPlacementDataFactory(vims_information
,
143 pinning
, order_constraints
).create_ns_placement_data()
145 vnf_placement
= MznPlacementConductor(self
.log
).do_placement_computation(nspd
)
147 except Exception as e
:
148 # Note: there is no cure for failure so we have a catch-all clause here
149 self
.log
.exception("PLA fault. Exception: {}".format(e
))
152 await self
.msgBus
.aiowrite("pla", "placement",
153 {'placement': {'vnf': vnf_placement
, 'nslcmopId': nslcmop_id
}})
155 def handle_kafka_command(self
, topic
, command
, params
):
156 self
.log
.info("Kafka msg arrived: {} {} {}".format(topic
, command
, params
))
157 if topic
== "pla" and command
== "get_placement":
158 nslcmop_id
= params
.get('nslcmopId')
159 self
.loop
.create_task(self
.get_placement(nslcmop_id
))
161 async def kafka_read(self
):
162 self
.log
.info("Task kafka_read start")
166 await self
.msgBus
.aioread(topics
, self
.loop
, self
.handle_kafka_command
)
167 except Exception as e
:
168 self
.log
.error("kafka read error. Exception: {}".format(e
))
169 await asyncio
.sleep(5, loop
=self
.loop
)
172 self
.loop
.run_until_complete(self
.kafka_read())
176 self
.msgBus
.disconnect()