Source code for qsarify.feature_selection_single

#-*- coding: utf-8 -*-
# Author: Stephen Szwiec
# Date: 2023-02-19
# Description: Single-Threaded Feature Selection Module
#
#Copyright (C) 2023 Stephen Szwiec
#
#This file is part of qsarify.
#
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU General Public License as published by
#the Free Software Foundation, either version 3 of the License, or
#(at your option) any later version.
#
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#GNU General Public License for more details.
#
#You should have received a copy of the GNU General Public License
#along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
Single-Threaded Feature Selection Module

This module contains the single-threaded version of the feature selection algorithm,
which is a genetic algorithm that uses a linear regression model to score each set of features,
using the output of clustering to ensure that the features are not redundant.

"""
import datetime
import random
import numpy as np
import pandas as pd
import sklearn.linear_model as lm
import itertools

[docs] def mlr_selection(X_data, y_data, cluster_info, component, model="regression", learning=50000, bank=200, interval=1000): """ Performs feature selection using a using a linear regression model and a genetic algorithm on a single thread. This is the vanilla version of the algorithm, which is not parallelized. Parameters ---------- X_data: DataFrame, descriptor data y_data: DataFrame, target data cluster_info: dict, descriptor cluster information component: int, number of features to select model: str, learning algorithm to use, default = "regression" learning: int, number of iterations to perform, default = 50000 bank: int, number of models to keep in the bank, default = 200 interval: int, number of iterations to perform before printing the current time, default = 1000 Returns ------- best_model: list, best model found best_score: float, best score found """ now = datetime.datetime.now() print("Start time: ", now.strftime('%H:%M:%S')) if model == "regression": print('\x1b[1;42m','Regression','\x1b[0m') y_mlr = lm.LinearRegression() e_mlr = lm.LinearRegression() else: print('\x1b[1;42m','Classification','\x1b[0m') y_mlr = SVC(kernel='rbf', C=1, gamma=0.1, random_state=0) e_mlr = SVC(kernel='rbf', C=1, gamma=0.1, random_state=0) # a list of numbered clusters nc = list(cluster_info.values()) num_clusters = list(range(max(nc))) # extract information from dictionary by inversion inv_cluster_info = dict() for k, v in cluster_info.items(): inv_cluster_info.setdefault(v, list()).append(k) # an ordered list of features in each cluster cluster = list(dict(sorted(inv_cluster_info.items())).values()) # fill the interation bank with random models # models contain 1-component number of features # ensure the models are not duplicated and non redundant index_sort_bank = set() model_bank = [ ini_desc for _ in range(bank) for ini_desc in [sorted([random.choice(cluster[random.choice(num_clusters)]) for _ in range(random.randint(1,component))])] if ini_desc not in tuple(index_sort_bank) and not index_sort_bank.add(tuple(ini_desc))] # score each set of features, saving each score and the corresponding feature set scoring_bank = list(map(lambda x: [y_mlr.fit(np.array(X_data.loc[:,x]), y_data.values.ravel()).score(np.array(X_data.loc[:,x]), y_data), list(X_data.loc[:,x].columns.values)], model_bank)) def evolve(i): """ Evolution of descriptors for learning algorithm, implemented as a function map Parameters ---------- i: list, descriptor set """ i = i[1] group_n = [cluster_info[x]-1 for x in i] sw_index = random.randrange(0, len(i)) sw_group = random.randrange(0, max(nc)) while sw_group in group_n: sw_group = random.randrange(0, max(nc)) b_set = [random.choice(cluster[sw_group]) if x == sw_index else i[x] for x in range(0, len(i))] b_set.sort() x = X_data[b_set].values y = y_data.values.ravel() score = e_mlr.fit(x, y).score(x, y) return [score, b_set] # perform main learning loop for n in range(learning): # initialize best score to the worst possible score best_score = -float("inf") # Evolve the bank of models and allow those surpassing the best score to replace the worst models up to the bank size rank_filter = filter(lambda x, best_score=best_score: x[0] > best_score and (best_score := x[0]), map(evolve, scoring_bank)) scoring_bank = sorted(itertools.chain(scoring_bank, rank_filter), reverse = True)[:bank] if n % interval == 0 and n != 0: tt = datetime.datetime.now() print(n, '=>', tt.strftime('%H:%M:%S'), scoring_bank[0]) # print output and return best model found during training print("Best score: ", scoring_bank[0][0]) clulog = [cluster_info[y] for y in scoring_bank[0][1]] print("Model's cluster info", clulog) fi = datetime.datetime.now() fiTime = fi.strftime('%H:%M:%S') print("Finish Time : ", fiTime) return scoring_bank[0][1]
[docs] def rf_selection(X_data, y_data, cluster_info, component, model="regression", learning=50000, bank=200, interval=1000): """ Performs feature selection using a using a random forest model and a genetic algorithm on a single thread. This is the vanilla version of the algorithm, which is not parallelized. Parameters ---------- X_data: DataFrame, descriptor data y_data: DataFrame, target data cluster_info: dict, descriptor cluster information component: int, number of features to select model: str, learning algorithm to use, default = "regression" learning: int, number of iterations to perform, default = 50000 bank: int, number of models to keep in the bank, default = 200 interval: int, number of iterations to perform before printing the current time, default = 1000 Returns ------- best_model: list, best model found best_score: float, best score found """ now = datetime.datetime.now() print("Start time: ", now.strftime('%H:%M:%S')) if model == "regression": print('\x1b[1;42m','Regression','\x1b[0m') y_rf = RandomForestRegressor(n_estimators=100, max_depth=2, random_state=0) e_rf = RandomForestRegressor(n_estimators=100, max_depth=2, random_state=0) else: print('\x1b[1;42m','Classification','\x1b[0m') y_rf = RandomForestClassifier(n_estimators=100, max_depth=2, random_state=0) e_rf = RandomForestClassifier(n_estimators=100, max_depth=2, random_state=0) # a list of numbered clusters nc = list(cluster_info.values()) num_clusters = list(range(max(nc))) # extract information from dictionary by inversion inv_cluster_info = dict() for k, v in cluster_info.items(): inv_cluster_info.setdefault(v, list()).append(k) # an ordered list of features in each cluster cluster = list(dict(sorted(inv_cluster_info.items())).values()) # fill the interation bank with random models # models contain 1-component number of features # ensure the models are not duplicated and non redundant index_sort_bank = set() model_bank = [ ini_desc for _ in range(bank) for ini_desc in [sorted([random.choice(cluster[random.choice(num_clusters)]) for _ in range(random.randint(1,component))])] if ini_desc not in tuple(index_sort_bank) and not index_sort_bank.add(tuple(ini_desc))] # score each set of features, saving each score and the corresponding feature set scoring_bank = list(map(lambda x: [y_rf.fit(np.array(X_data.loc[:,x]), y_data.values.ravel()).score(np.array(X_data.loc[:,x]), y_data), list(X_data.loc[:,x].columns.values)], model_bank)) def evolve(i): """ Evolution of descriptors for learning algorithm, implemented as a function map Parameters ---------- i: list, descriptor set """ i = i[1] group_n = [cluster_info[x]-1 for x in i] sw_index = random.randrange(0, len(i)) sw_group = random.randrange(0, max(nc)) while sw_group in group_n: sw_group = random.randrange(0, max(nc)) b_set = [random.choice(cluster[sw_group]) if x == sw_index else i[x] for x in range(0, len(i))] b_set.sort() x = X_data[b_set].values y = y_data.values.ravel() score = e_rf.fit(x, y).score(x, y) return [score, b_set] # perform main learning loop for n in range(learning): # initialize best score to the worst possible score best_score = -float("inf") # Evolve the bank of models and allow those surpassing the best score to replace the worst models up to the bank size rank_filter = filter(lambda x, best_score=best_score: x[0] > best_score and (best_score := x[0]), map(evolve, scoring_bank)) scoring_bank = sorted(itertools.chain(scoring_bank, rank_filter), reverse = True)[:bank] if n % interval == 0 and n != 0: tt = datetime.datetime.now() print(n, '=>', tt.strftime('%H:%M:%S'), scoring_bank[0]) # print output and return best model found during training print("Best score: ", scoring_bank[0][0]) clulog = [cluster_info[y] for y in scoring_bank[0][1]] print("Model's cluster info", clulog) fi = datetime.datetime.now() fiTime = fi.strftime('%H:%M:%S') print("Finish Time : ", fiTime) return scoring_bank[0][1]