Using XGBoost models with python’s Flower
The idea here is to use Flower to create a federated infrastructure to run an XGBClassifier
in a series of datasets located in exclusive-access nodes.
This means that a central server, the one that will connect and aggregate the models, is required as well as the local client, the one training the models on the exclusive-access datasets.
Client code
The code assumes that there is an environment-variables called DATA_PATH
pointing to a CSV-file having the data to be used. With this in mind, the first step is to create the XGBClient
, wrapping a NumPyClient
’s from flwr
.
We should start with the imports:
import os
import flwr as fl
import xgboost as xgb
from utils_flwr_xgboost import *
We will use the os
mode to access the DATA_PATH
variables, the flwr
to create the wrapper and start the client, xgb
to access the models class, and utils_flwr_xgboost
is a utility module that has five functions: load_data
, prepare_data
, get_parameters
, model_from_paramaters
, and get_metrics_from_model
. We will describe them during the implementation of the client and the server.
We will start with the generic code to load the data and start the client:
tmp_path = '/home/.../tmp-cl/'
X_train, X_test, y_train, y_test = prepare_data( os.getenv( 'DATA_PATH' ) )
The function prepare_data
calls the load_data
and then splits the data into training and testing sets:
from sklearn.model_selection import train_test_split
def load_data( data_path ):
data = pd.read_csv( data_path )
code_id = 'ID'
code_outcome = 'class'
X = data.drop( [ code_id, code_outcome ], axis = 1 )
y = data[ code_outcome ]
return X, y
def prepare_data( data_path, test_size = 0.2, random_state = 42 ):
X, y = load_data( data_path )
X_train, X_test, y_train, y_test = train_test_split( X, y, test_size = test_size, random_state = random_state )
return X_train, X_test, y_train, y_test
Now, let’s go for the XGBClient
’s implementation:
class XGBClient( fl.client.NumPyClient ):
def __init__( self ):
self.model = xgb.XGBClassifier()
self.model.fit( X_train, y_train )
def get_parameters( self, config ):
return get_parameters( self.model, tmp_path )
def fit( self, parameters, config ):
self.model = model_from_paramaters( parameters, tmp_path )
self.model.fit( X_train, y_train )
return (
get_parameters( self.model, tmp_path ),
len( X_train ),
get_metrics_from_model( self.model, X_test, y_test )
)
def evaluate(self, parameters, config):
self.model = model_from_paramaters( parameters, tmp_path )
metrics = get_metrics_from_model( self.model, X_test, y_test )
return (
metrics[ 'loss' ],
int( len( X_test ) ),
metrics
)
Here the fun starts!
Our XGBClient
is implementing 3 required methods so the infrastructure can communicate the updates on the model from client to server and vice-versa: get_parameters
, fit
, and evaluate
.
get_parameters
The method, calling the get_parameters
from our utility module, is packaging the “parameters” of the model. The name “parameters” can be a bit confusing or create a misunderstanding. For the XGBoost family of models those “parameters” are the weights of the leaves. Another example would be that for a linear or logistic model, those “parameters” would, be the coefficients of the models.
In our case, the one for XGBoost models, we need to obtain the full definition of the forest of trees. Following XGBoost’s documentation we see that we could use dump_model
or save_model
. But if we read properly, only the definition created using the save_model
can be then loaded again to recreate the model.
from pathlib import Path
def get_parameters( model, tmp_path ):
tmp_model = Path( tmp_path ) / 'model_tmp_prms.json'
model.get_booster().save_model( tmp_model )
with open( tmp_model, 'r' ) as fr:
params = json.load( fr )
return [ json.dumps( params ) ]
Therefore we need a temporary JSON file where the model is saved. We follow by reading this file and returning it serialized. See that the get_paramaters
method from our client needs to return a list.
fit
This method is the one in charge to train the model given a set of parameters. This method needs, first, to recreate a model from the set of parameters. The function model_from_paramaters
is the one in charge of doing so:
from pathlib import Path
def model_from_paramaters( parameters, tmp_path ):
parameters = parameters[ 0 ]
tmp_model = Path( tmp_path ) / 'model_tmp_fit.json'
parameters = json.loads( str( parameters ) )
with open( tmp_model, 'w' ) as fw:
json.dump( parameters, fw )
model = xgb.XGBClassifier()
model.load_model( tmp_model )
return model
We start by loading the parameters to a dictionary from the serialized JSON that is given. Then, we save the model definition to a file and load this file to recreate the model using the method load_model
.
Once the model is created from the set of parameters, we fit it and return the required results: the newly refined parameters, the size of the training set, and some quality metrics.
from sklearn.metrics import log_loss
def get_metrics_from_model( model, X_test, y_test ):
loss = log_loss( y_test, model.predict_proba( X_test ) )
accuracy = float( model.score( X_test, y_test ) )
return { 'loss': float( loss ), 'accuracy': accuracy }
evaluate
This method evaluates a given set of parameters on the local dataset and returns the metrics.
Server code
Lastly, we will write the code used on the server side. As for our client, the code assumes that there is an environment-variables called DATA_PATH
pointing to a CSV-file having the data to be used.
The following imports are needed by the server script.
import os
import flwr as fl
import numpy as np
import xgboost as xgb
from flwr.common import ndarrays_to_parameters, parameters_to_ndarrays
from utils_flwr_xgboost import *
Now, we will be implementing not a flwr
’s server but a flwr
’s strategy. A strategy is in change of defining how the federation works as well as initializing parameters and aggregating models. We will use the FedAvg
strategy as a base strategy.
class XGBAvgStrategy( fl.server.strategy.FedAvg ):
def aggregate_fit( self, rnd, results, failures ):
if not results:
return None, {}
status = [ x[1].status for x in results ]
parameters = [ json.loads( parameters_to_ndarrays( x[1].parameters )[ 0 ].item( 0 ) ) for x in results ]
num_examples = [ x[1].num_examples for x in results ]
metrics = [ x[1].metrics for x in results ]
aggregated_params = self.aggregate( status, parameters, num_examples, metrics )
model = model_from_paramaters( aggregated_params, tmp_path )
model.fit( X_train, y_train )
return ndarrays_to_parameters( np.array( get_parameters( model, tmp_path ) ) ), {}
def aggregate( self, status, parameters, num_examples, metrics ):
# [ this one is up to you :-) ]
parameters = json.dumps( parameters[ 0 ] )
return [ parameters ]
aggregate_fit
This method is responsible to aggregate the models obtained from the multiple nodes and return a single model from them. In our implementation, we put here all the steps required to obtain all the required data from each of the nodes and models. For instance: the status
variable indicates if the transmission between each node and the server worked or failed, parameters
will have all the dictionaries loaded from the transmitted JSONs, num_examples
will provide the number of samples used to train each model, and metrics
will have the metrics of each node.
Then, we call our custom aggregate
method providing the previous lists, and create a new model from the aggregated models before sending it back to the clients to be trained again.
The tricky steps here are being able to set the parameters to be read from and sent to the clients. Take a look and see that to read the parameters obtained from the clients we are using parameters_to_ndarrays
and ndarrays_to_parameters
to send them to the clients.
aggregate
Here is where magic happens and all the XGBClassifier
objects are averaged and a single new set of parameters is returned.
Finally, we only need to load the data used to assign the initial set of parameters and run the strategy.
tmp_path = '/home/.../xgboost/tmp-sr/'
X_train, X_test, y_train, y_test = prepare_data( os.getenv( 'DATA_PATH' ) )
fl.server.start_server(
server_address = "[::]:8080",
config = fl.server.ServerConfig(num_rounds=3),
strategy = XGBAvgStrategy()
)