This commit is contained in:
atla8167 2024-12-08 17:47:01 +02:00
parent 1887aa088e
commit 24ed3fcc51
23 changed files with 1910 additions and 0 deletions

107
base/glacier/README.md Executable file

@ -0,0 +1,107 @@
# Glacier: Guided Locally Constrained Counterfactual Explanations for Time Series Classification
Glacier: Guided Locally Constrained Counterfactual Explanations for Time Series Classification<br/>
Published at Machine Learning (MACH) Journal, 2024 Mar.<br/>
(Originally published at the DS'2021 Conference)<br/>
[[paper_link]](https://doi.org/10.1007/s10994-023-06502-x)
We propose `Glacier`, a model-agnostic method for generating *locally constrained counterfactual explanations* for time series classification using gradient search either on the original space, or on a latent space that is learned using an auto-encoder. An additional flexibility of our method is the inclusion of constraints on the counterfactual generation process that favour applying changes to particular time series points or segments, while discouraging changing some others. The main purpose of these constraints is to ensure the generation of more realistic and feasible counterfactuals. We conduct extensive experiments on a total of 40 datasets from the UCR archive, comparing different instantiations of Glacier against three competitors.
If you find this GitHub repo useful in your research work, please consider citing our paper:
```
@article{wang_glacier_2024,
title = {Glacier: guided locally constrained counterfactual explanations for time series classification},
volume = {113},
issn = {1573-0565},
doi = {10.1007/s10994-023-06502-x},
number = {3},
journal = {Machine Learning},
author = {Wang, Zhendong and Samsten, Isak and Miliou, Ioanna and Mochaourab, Rami and Papapetrou, Panagiotis},
month = mar,
year = {2024},
}
```
Please also consider citing the original publication:
```
@inproceedings{wang_learning_2021,
title = {Learning Time Series Counterfactuals via Latent Space Representations},
booktitle = {International Conference on Discovery Science},
author = {Wang, Zhendong and Samsten, Isak, and Mochaourab, Rami, and Papapetrou, Panagiotis},
year = {2021},
}
```
## Dependencies
For reproducibility, all the experiments and the computational runtime are evaluated with NVIDIA GeForce RTX 2080 (GPU) and AMD Ryzen Threadripper 2950X 16-Core Processor (CPU), with following dependencies:
- python=3.8
- cudatoolkit=11.2
- cudnn=8.1
### Installation:
We recommend to create a conda virtual environment with the following command:
```
conda env create -f environment.yml
```
## Data preparation
We mainly focus on the problem of binary classification tasks with univariate time series data. After filtering, a subset of 40 datasets from the [UCR archive](https://www.cs.ucr.edu/eamonn/time_series_data_2018/) is selected, containing various representations from different data sources. For example, *TwoLeadECG* represents ECG measurements in the medical domain and *Wafer* exemplifies sensor data in semiconductor manufacturing. In terms of time series length, it varies from `24` (*ItalyPowerDemand*) to `2709` timesteps(*HandOutlines*) in our chosen subset.
For the evaluation in our experiment, we choose to apply 5-fold cross-validation with stratified splits, with a standard 80/20 training and testing for all the datasets. Also, to compensate the imbalanced target classes in specific datasets, we apply a simple upsampling technique and then choose to generate counterfactual explanations for a subset of 50 samples with a fixed random seed among the testing data.
## Experiment Setup
In the experiment, we choose to train one of the most recent state-of-the-art CNN models, `LSTM-FCN`, as the black-box model to apply the Glacier counterfactual framework. We then employ two autoencoders to transform the original samples into counterfactuals: a 1-dimensional convolutional neural network (`1dCNN`) model and an `LSTM` model; together with a version of gradient search directly on the original space (`NoAE`).
Thus, we explore four variants of Glacier: *unconstrained*, *example-specific*, *global*, and *uniform* across 40 UCR datasets. In our main experiment, we set the decision boundary threshold $\tau=0.5$, and the prediction margin weight $w=0.5$ in the optimization function. The variants are explained as below:
- *unconstrained*: encouraging changes in the whole time series;
- *example-specific*: imposing constraints to favouring changes to particular time series points on a given test example, using LIMESegment (Sivill et al., 2022);
- *global*: imposing constraints to favouring changes to particular time series points across all the test examples, using interval importance;
- *uniform*: disencouraging changes in the whole time series.
### Baseline Models
We additionally compare with three baseline models for the counterfactual generation:
- (Karlsson et al., 2018) *Random shapelet forest (RSF)*: we set the number of estimators to `50` and max depth to `5`, while the other parameters are kept at the default values (see the original implementation: `https://github.com/isaksamsten/wildboar`);
- (Karlsson et al., 2018) *k-NN counterfactual (k-NN)*: we train a k-NN classifier with k equals to `5` and the distance metric set to Euclidean;
- (Delaney et al, 2021) *Native Guide (NG)*: we adopt an implementation and default parameters from `https://github.com/e-delaney/Instance-Based_CFE_TSC`.
## Running the comparison
The implementation code of our experiment is at [here](./src/gc_latentcf_search.py), with a detailed description of Glacier model parameters. For executing the experiment for a single dataset (e.g. *TwoLeadECG*), simply run:
```
python src/gc_latentcf_search.py --dataset TwoLeadECG --pos 1 --neg 2 --output twoleadecg-outfile.csv --w-type local --lr-list 0.0001 0.0001 0.0001;
```
Then for running the experiment for all the 40 datasets from UCR, run the following code:
```
bash run_all_datasets.sh
```
```
bash run_cf_baselines.sh
```
Finally, we conduct an ablation study to examine the effects of the following two hyperparameters in Glacier for the *TwoLeadECG* dataset: prediction margin weight parameter $w$ and decision boundary threshold $\tau$, using the following code:
```
bash run_ablation_pred_margin.sh
```
```
bash run_ablation_tau.sh
```
## Results
The results of main analysis are available at this [csv file](./results/results_final_all.csv), together with two ablation study [csv files](./results/).
Besides, we have two example cases generated from Glacier. The first one shows examples of counterfactuals for ECG measurements using *unconstrained* and *example-specific* constraints in Glacier:
![examples](./results/results_example_local.png)
And the second case show the counterfactuals for engine signals with *unconstrained* and *global constraints* in Glacier:
![examples](./results/results_example_global.png)
In both cases, illustrated in blue are the original time series and in red the generated counterfactuals of the opposite class. The “red” bars suggest the time series points for which changes are favourable.

@ -0,0 +1,214 @@
from scipy import signal
import numpy as np
import stumpy
from sklearn.linear_model import Ridge
from sklearn.utils import check_random_state
from fastdtw import fastdtw
import random
def NNSegment(t, window_size, change_points):
"""Return the change points of given time series
Input:
t - numpy array of size T
window_size - int where window_size < T
change_points - user specified number of change points
Output:
np.array of change point indexes
"""
"""Return the change points of given time series
Input:
t - numpy array of size T
window_size - int where window_size < T
change_points - user specified number of change points
Output:
np.array of change point indexes
"""
mp = stumpy.stump(t, m=window_size)
proposed_cp = [i for i in range(0,mp.shape[0]-1) if mp[i+1,1] != mp[i,1] + 1]
tolerance = int(window_size/2)
variances = []
for idx in proposed_cp:
mean_change = np.abs(np.mean(t[idx-tolerance:idx]) - np.mean(t[idx:idx+tolerance]))
std_change = np.abs(np.std(t[idx-tolerance:idx]) - np.std(t[idx:idx+tolerance]))
std_mean = np.mean([np.std(t[idx-tolerance:idx]),np.std(t[idx:idx+tolerance])])
variances.append(mean_change*std_change/std_mean)
sorted_idx = np.flip(np.array(variances).argsort())
sorted_cp = [proposed_cp[idx] for idx in sorted_idx]
selected_cp = []
covered = list(np.arange(0,tolerance))
ic, i = 0, 0
while ic < change_points:
if i == len(sorted_cp):
break
if sorted_cp[i] not in covered:
ic += 1
selected_cp.append(sorted_cp[i])
covered = covered + list(np.arange(sorted_cp[i],sorted_cp[i]+tolerance))
covered = covered + list(np.arange(sorted_cp[i]-tolerance,sorted_cp[i]))
i +=1
selected_cp = np.sort(np.asarray(selected_cp))
return(list(selected_cp))
def backgroundIdentification(original_signal,f=40):
f, t, Zxx = signal.stft(original_signal.reshape(original_signal.shape[0]),1,nperseg=f)
frequency_composition_abs = np.abs(Zxx)
measures = []
for freq,freq_composition in zip(f,frequency_composition_abs):
measures.append(np.mean(freq_composition)/np.std(freq_composition))
max_value = max(measures)
selected_frequency = measures.index(max_value)
weights = 1-(measures/sum(measures))
dummymatrix = np.zeros((len(f),len(t)))
dummymatrix[selected_frequency,:] = 1
#Option to admit information from other frequency bands
"""dummymatrix = np.ones((len(f),len(t)))
for i in range(0,len(weights)):
dummymatrix[i,:] = dummymatrix[i,:] * weights[i]"""
background_frequency = Zxx * dummymatrix
_, xrec = signal.istft(background_frequency, 1)
xrec = xrec[:original_signal.shape[0]]
xrec = xrec.reshape(original_signal.shape)
return xrec
def RBP(generated_samples_interpretable, original_signal, segment_indexes, f):
generated_samples_raw = []
xrec = backgroundIdentification(original_signal)
for sample_interpretable in generated_samples_interpretable:
raw_signal = original_signal.copy()
for index in range(0,len(sample_interpretable)-1):
if sample_interpretable[index] == 0:
index0 = segment_indexes[index]
index1 = segment_indexes[index+1]
raw_signal[index0:index1] = xrec[index0:index1]
generated_samples_raw.append(np.asarray(raw_signal))
return np.asarray(generated_samples_raw)
def RBPIndividual(original_signal, index0, index1):
xrec = backgroundIdentification(original_signal)
raw_signal = original_signal.copy()
raw_signal[index0:index1] = xrec[index0:index1]
return raw_signal
def LIMESegment(example, model, model_type='class', distance='dtw', n=100, window_size=None, cp=None, f=None, random_state=None):
random_state = check_random_state(random_state)
if window_size is None:
window_size =int(example.shape[0]/5)
if cp is None:
cp = 3
if f is None:
f = int(example.shape[0]/10)
cp_indexes = NNSegment(example.reshape(example.shape[0]), window_size, cp)
segment_indexes = [0] + cp_indexes + [-1]
generated_samples_interpretable = [random_state.binomial(1, 0.5, len(cp_indexes)+1) for _ in range(0,n)] #Update here with random_state
generated_samples_raw = RBP(generated_samples_interpretable, example, segment_indexes, f)
sample_predictions = model.predict(generated_samples_raw)
if model_type == 'proba':
y_labels = np.argmax(sample_predictions, axis=1)
elif isinstance(model_type, int): #Update here to use the probability of the target class
y_labels = sample_predictions[:, model_type]
else:
y_labels = sample_predictions
if distance == 'dtw':
distances = np.asarray([fastdtw(example, sample)[0] for sample in generated_samples_raw])
weights = np.exp(-(np.abs((distances - np.mean(distances))/np.std(distances)).reshape(n,)))
elif distance == 'euclidean':
distances = np.asarray([np.linalg.norm(np.ones(len(cp_indexes)+1)-x) for x in generated_samples_interpretable])
weights = np.exp(-(np.abs(distances**2/0.75*(len(segment_indexes)**2)).reshape(n,)))
clf = Ridge(random_state=random_state) #Update here with random_state
clf.fit(generated_samples_interpretable,y_labels, weights)
return clf.coef_, segment_indexes
def background_perturb(original_signal, index0, index1, X_background):
perturbed_signal = original_signal.copy()
selected_background_ts = X_background[random.randint(0, 20)]
perturbed_signal[index0:index1] = selected_background_ts.reshape(perturbed_signal.shape)[index0:index1]
return np.asarray(perturbed_signal)
def mean_perturb(original_signal, index0, index1, mean_value, ws):
perturbed_signal = original_signal.copy()
mean_signal = np.ones(original_signal.shape)*mean_value
perturbed_signal[index0:index1] = mean_signal[index0:index1]
return perturbed_signal
def calculate_mean(cp_indexes, X_background, ws):
sample_averages = []
for ts in X_background:
window_averages = np.mean([np.mean(ts[i:i+ws]) for i in cp_indexes])
sample_averages.append(window_averages)
return np.mean(sample_averages)
def LEFTIST(example, model, X_background, model_type="class", n=100):
ts_length = example.shape[0]
cp_indexes = [i for i in range(0,example.shape[0],int(example.shape[0]/10))]
example_interpretable = np.ones(len(cp_indexes))
generated_samples_interpretable = [ np.random.binomial(1, 0.5, len(cp_indexes)) for _ in range(0,n)]
generated_samples_original = []
segment_indexes = cp_indexes + [-1]
for sample_interpretable in generated_samples_interpretable:
raw_sample = example.copy()
for index in range(0,len(sample_interpretable)):
if sample_interpretable[index] == 0:
index0 = segment_indexes[index]
index1 = segment_indexes[index+1]
raw_sample = background_perturb(raw_sample,index0,index1,X_background)
generated_samples_original.append((raw_sample))
sample_predictions = model.predict(np.asarray(generated_samples_original))
if model_type == 'proba':
y_labels = np.argmax(sample_predictions, axis=1)
else:
y_labels = sample_predictions
distances = np.asarray([np.linalg.norm(np.ones(len(cp_indexes))-x) for x in generated_samples_interpretable])
weights = np.exp(-(np.abs(distances**2/(len(segment_indexes)**2))))
clf = Ridge()
clf.fit(generated_samples_interpretable,y_labels,weights)
return clf.coef_, cp_indexes
distances = np.asarray([np.linalg.norm(np.ones(len(cp_indexes))-x) for x in generated_samples_interpretable])
weights = np.exp(-(np.abs(distances**2/(len(segment_indexes)**2))))
clf = Ridge()
clf.fit(generated_samples_interpretable,y_labels,weights)
return clf.coef_, cp_indexes
def NEVES(example, model, X_background, model_type="class", n=100):
ts_length = example.shape[0]
cp_indexes = [i for i in range(0,example.shape[0],int(example.shape[0]/10))]
example_interpretable = np.ones(len(cp_indexes))
generated_samples_interpretable = [ np.random.binomial(1, 0.5, len(cp_indexes)) for _ in range(0,n)]
generated_samples_original = []
mean_perturb_value = calculate_mean(cp_indexes, X_background, int(cp_indexes[1]-cp_indexes[0]))
segment_indexes = cp_indexes + [ts_length]
for sample_interpretable in generated_samples_interpretable:
raw_sample = example.copy()
for index in range(0,len(sample_interpretable)):
if sample_interpretable[index] == 0:
index0 = segment_indexes[index]
index1 = segment_indexes[index+1]
raw_sample = mean_perturb(raw_sample,index0,index1,mean_perturb_value,int(cp_indexes[1]-cp_indexes[0]))
generated_samples_original.append(raw_sample)
sample_predictions = model.predict(np.asarray(generated_samples_original))
if model_type == 'proba':
y_labels = np.argmax(sample_predictions, axis=1)
else:
y_labels = sample_predictions
distances = np.asarray([np.linalg.norm(np.ones(len(cp_indexes))-x) for x in generated_samples_interpretable])
weights = np.exp(-(np.abs(distances**2/(len(segment_indexes)**2))).reshape(n,))
clf = Ridge()
clf.fit(generated_samples_interpretable,y_labels,weights)
return clf.coef_, cp_indexes

@ -0,0 +1,53 @@
import numpy as np
def add_noise(ts):
mu, sigma = 0, 0.1 # mean and standard deviation
noise = np.random.normal(mu, sigma, ts.shape[0])
noisy_ts = np.add(ts.reshape(ts.shape[0]),noise.reshape(ts.shape[0]))
return noisy_ts
def robustness(explanations, noisy_explanations):
robust = 0
for i in range(0,len(explanations)):
print(explanations)
original_order = np.argsort(explanations[i][0])
noisy_order = np.argsort(noisy_explanations[i][0])
if np.array_equal(original_order,noisy_order[:len(original_order)]):
robust += 1
return robust/len(explanations)
def reverse_segment(ts, index0, index1):
perturbed_ts = ts.copy()
perturbed_ts[index0:index1] = np.flip(ts[index0:index1])
return perturbed_ts
def faithfulness(explanations, x_test, y_test, original_predictions, model, model_type):
perturbed_samples = []
for i in range(0,len(explanations)):
top_index = np.argmax(np.abs(explanations[i][0]))
segment_indices = explanations[i][1]+[-1]
example_ts = x_test[i].copy()
reversed_sample = reverse_segment(example_ts,segment_indices[top_index],segment_indices[top_index+1])
perturbed_samples.append(reversed_sample)
if model_type == 'proba':
reversed_predictions = model.predict(np.asarray(perturbed_samples))
correct_indexes = []
differences = []
for i in range(0,len(y_test)):
if y_test[i] == np.argmax(reversed_predictions[i]):
correct_indexes.append(i)
for index in correct_indexes:
prediction_index = int(np.argmax(original_predictions[index]))
differences.append(np.abs(original_predictions[index][prediction_index] - reversed_predictions[index][prediction_index]))
return np.mean(differences)
else:
reversed_samples = np.asarray(perturbed_samples)
reversed_predictions = model.predict(reversed_samples.reshape(reversed_samples.shape[:2]))
correct_indexes = []
for i in range(0,len(original_predictions)):
if original_predictions[i] == reversed_predictions[i]:
correct_indexes.append(i)
return len(correct_indexes)/len(original_predictions)

@ -0,0 +1,61 @@
from scipy import signal
import numpy as np
from scipy.ndimage.filters import gaussian_filter
def backgroundIdentification(original_signal):
f, t, Zxx = signal.stft(original_signal,1,nperseg=40)
frequency_composition_abs = np.abs(Zxx)
measures = []
for freq,freq_composition in zip(f,frequency_composition_abs):
measures.append(np.mean(freq_composition)/np.std(freq_composition))
max_value = max(measures)
selected_frequency = measures.index(max_value)
weights = 1-(measures/sum(measures))
dummymatrix = np.zeros((len(f),len(t)))
dummymatrix[selected_frequency,:] = 1
#Option to admit information from other frequency bands
"""dummymatrix = np.ones((len(f),len(t)))
for i in range(0,len(weights)):
dummymatrix[i,:] = dummymatrix[i,:] * weights[i]"""
background_frequency = Zxx * dummymatrix
_, xrec = signal.istft(background_frequency, 1)
return xrec
def RBP(generated_samples_interpretable, original_signal, segment_indexes):
generated_samples_raw = []
xrec = backgroundIdentification(original_signal)
for sample_interpretable in generated_samples_interpretable:
raw_signal = original_signal.copy()
for index in range(0,len(sample_interpretable)-1):
if sample_interpretable[index] == 0:
index0 = segment_indexes[index]
index1 = segment_indexes[index+1]
raw_signal[index0:index1] = xrec[index0:index1]
generated_samples_raw.append(np.asarray(raw_signal))
return np.asarray(generated_samples_raw)
def RBPIndividual(original_signal, index0, index1):
xrec = backgroundIdentification(original_signal)
raw_signal = original_signal.copy()
raw_signal[index0:index1] = xrec[index0:index1]
return raw_signal
def zeroPerturb(original_signal, index0, index1):
new_signal = original_signal.copy()
new_signal[index0:index1] = np.zeros(100)
return new_signal
def noisePerturb(original_signal, index0, index1):
new_signal = original_signal.copy()
new_signal[index0:index1] = np.random.randint(-100,100,100).reshape(100)
return new_signal
def blurPerturb(original_signal, index0, index1):
new_signal = original_signal.copy()
new_signal[index0:index1] = gaussian_filter(new_signal[index0:index1],np.std(original_signal))
return new_signal

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

358
base/glacier/src/_guided.py Executable file

@ -0,0 +1,358 @@
import warnings
import numpy as np
import tensorflow as tf
from tensorflow import keras
from wildboar.explain import IntervalImportance
from .LIMESegment.Utils.explanations import LIMESegment
class ModifiedLatentCF:
"""Explanations by generating a counterfacutal sample in the latent space of
any autoencoder.
References
----------
Learning Time Series Counterfactuals via Latent Space Representations,
Wang, Z., Samsten, I., Mochaourab, R., Papapetrou, P., 2021.
in: International Conference on Discovery Science, pp. 369384. https://doi.org/10.1007/978-3-030-88942-5_29
"""
def __init__(
self,
probability=0.5,
*,
tolerance=1e-6,
max_iter=100,
optimizer=None,
autoencoder=None,
pred_margin_weight=1.0, # weighted_steps_weight = 1 - pred_margin_weight
step_weights="local",
random_state=None,
):
"""
Parameters
----------
probability : float, optional
The desired probability assigned by the model
tolerance : float, optional
The maximum difference between the desired and assigned probability
optimizer :
Optimizer with a defined learning rate
max_iter : int, optional
The maximum number of iterations
autoencoder : int, optional
The autoencoder for the latent representation
- if None the sample is generated in the original space
- if given, the autoencoder is expected to have `k` decoder layer and `k`
encoding layers.
"""
self.optimizer_ = (
tf.optimizers.Adam(learning_rate=1e-4) if optimizer is None else optimizer
)
self.mse_loss_ = keras.losses.MeanSquaredError()
self.probability_ = tf.constant([probability])
self.tolerance_ = tf.constant(tolerance)
self.max_iter = max_iter
self.autoencoder = autoencoder
# Weights of the different loss components
self.pred_margin_weight = pred_margin_weight
self.weighted_steps_weight = 1 - self.pred_margin_weight
self.step_weights = step_weights
self.random_state = random_state
def fit(self, model):
"""Fit a new counterfactual explainer to the model
Paramaters
----------
model : keras.Model
The model
"""
if self.autoencoder:
(
encode_input,
encode_output,
decode_input,
decode_output,
) = extract_encoder_decoder(self.autoencoder)
self.decoder_ = keras.Model(inputs=decode_input, outputs=decode_output)
self.encoder_ = keras.Model(inputs=encode_input, outputs=encode_output)
else:
self.decoder_ = None
self.encoder_ = None
self.model_ = model
return self
def predict(self, x):
"""Compute the difference between the desired and actual probability
Parameters
---------
x : Variable
Variable of the sample
"""
if self.autoencoder is None:
z = x
else:
z = self.decoder_(x)
return self.model_(z)
# The "pred_margin_loss" is designed to measure the prediction probability to the desired decision boundary
def pred_margin_mse(self, prediction):
return self.mse_loss_(self.probability_, prediction)
# An auxiliary MAE loss function to measure the proximity with step_weights
def weighted_mae(self, original_sample, cf_sample, step_weights):
return tf.math.reduce_mean(
tf.math.multiply(tf.math.abs(original_sample - cf_sample), step_weights)
)
# An auxiliary normalized L2 loss function to measure the proximity with step_weights
def weighted_normalized_l2(self, original_sample, cf_sample, step_weights):
var_diff = tf.math.reduce_variance(original_sample - cf_sample)
var_orig = tf.math.reduce_variance(original_sample)
var_cf = tf.math.reduce_variance(cf_sample)
normalized_l2 = 0.5 * var_diff / (var_orig + var_cf)
return tf.math.reduce_mean(
tf.math.multiply(
normalized_l2,
step_weights,
)
)
# additional input of step_weights
def compute_loss(self, original_sample, z_search, step_weights, target_label):
loss = tf.zeros(shape=())
decoded = self.decoder_(z_search) if self.autoencoder is not None else z_search
pred = self.model_(decoded)[:, target_label]
pred_margin_loss = self.pred_margin_mse(pred)
loss += self.pred_margin_weight * pred_margin_loss
weighted_steps_loss = self.weighted_mae(
original_sample=tf.cast(original_sample, dtype=tf.float32),
cf_sample=tf.cast(decoded, dtype=tf.float32),
step_weights=tf.cast(step_weights, tf.float32),
)
# weighted_steps_loss = self.weighted_normalized_l2(
# original_sample=tf.cast(original_sample, dtype=tf.float32),
# decoded=tf.cast(decoded, dtype=tf.float32),
# step_weights=tf.cast(step_weights, tf.float32)
# )
loss += self.weighted_steps_weight * weighted_steps_loss
return loss, pred_margin_loss, weighted_steps_loss
# TODO: compatible with the counterfactuals of wildboar
# i.e., define the desired output target per label
def transform(self, x, pred_labels):
"""Generate counterfactual explanations
x : array-like of shape [n_samples, n_timestep, n_dims]
The samples
"""
result_samples = np.empty(x.shape)
losses = np.empty(x.shape[0])
# `weights_all` needed for debugging
weights_all = np.empty((x.shape[0], 1, x.shape[1], x.shape[2]))
for i in range(x.shape[0]):
if i % 25 == 0:
print(f"{i+1} samples been transformed.")
# if self.step_weights == "global" OR "uniform"
if isinstance(self.step_weights, np.ndarray): # "global" OR "uniform"
step_weights = self.step_weights
elif self.step_weights == "local":
# ignore warning of matrix multiplication, from LIMESegment: `https://stackoverflow.com/questions/29688168/mean-nanmean-and-warning-mean-of-empty-slice`
# ignore warning of scipy package warning, from LIMESegment: `https://github.com/paulvangentcom/heartrate_analysis_python/issues/31`
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
warnings.simplefilter("ignore", category=UserWarning)
step_weights = get_local_weights(
x[i],
self.model_,
random_state=self.random_state,
pred_label=pred_labels[i],
)
else:
raise NotImplementedError(
"step_weights not implemented, please choose 'local', 'global' or 'uniform'."
)
# print(step_weights.reshape(-1))
x_sample, loss = self._transform_sample(
x[np.newaxis, i], step_weights, pred_labels[i]
)
result_samples[i] = x_sample
losses[i] = loss
weights_all[i] = step_weights
print(f"{i+1} samples been transformed, in total.")
return result_samples, losses, weights_all
def _transform_sample(self, x, step_weights, pred_label):
"""Generate counterfactual explanations
x : array-like of shape [n_samples, n_timestep, n_dims]
The samples
"""
# TODO: check_is_fitted(self)
if self.autoencoder is not None:
z = tf.Variable(self.encoder_(x))
else:
z = tf.Variable(x, dtype=tf.float32)
it = 0
target_label = 1 - pred_label # for binary classification
with tf.GradientTape() as tape:
loss, pred_margin_loss, weighted_steps_loss = self.compute_loss(
x, z, step_weights, target_label
)
if self.autoencoder is not None:
pred = self.model_(self.decoder_(z))
else:
pred = self.model_(z)
# # uncomment for debug
# print(
# f"current loss: {loss}, pred_margin_loss: {pred_margin_loss}, weighted_steps_loss: {weighted_steps_loss}, pred prob:{pred}, iter: {it}."
# )
# TODO: modify the loss to check both validity and proximity; how to design the condition here?
# while (pred_margin_loss > self.tolerance_ or pred[:, 1] < self.probability_ or weighted_steps_loss > self.step_tolerance_)?
# loss > tf.multiply(self.tolerance_rate_, loss_original)
#
while (
pred_margin_loss > self.tolerance_
or pred[:, target_label] < self.probability_
) and (it < self.max_iter if self.max_iter else True):
# Get gradients of loss wrt the sample
grads = tape.gradient(loss, z)
# Update the weights of the sample
self.optimizer_.apply_gradients([(grads, z)])
with tf.GradientTape() as tape:
loss, pred_margin_loss, weighted_steps_loss = self.compute_loss(
x, z, step_weights, target_label
)
it += 1
if self.autoencoder is not None:
pred = self.model_(self.decoder_(z))
else:
pred = self.model_(z)
# # uncomment for debug
# print(
# f"current loss: {loss}, pred_margin_loss: {pred_margin_loss}, weighted_steps_loss: {weighted_steps_loss}, pred prob:{pred}, iter: {it}. \n"
# )
res = z.numpy() if self.autoencoder is None else self.decoder_(z).numpy()
return res, float(loss)
def extract_encoder_decoder(autoencoder):
"""Extract the encoder and decoder from an autoencoder
autoencoder : keras.Model
The autoencoder of `k` encoders and `k` decoders
"""
depth = len(autoencoder.layers) // 2
encoder = autoencoder.layers[1](autoencoder.input)
for i in range(2, depth):
encoder = autoencoder.layers[i](encoder)
encode_input = keras.Input(shape=encoder.shape[1:])
decoder = autoencoder.layers[depth](encode_input)
for i in range(depth + 1, len(autoencoder.layers)):
decoder = autoencoder.layers[i](decoder)
return autoencoder.input, encoder, encode_input, decoder
def get_local_weights(
input_sample, classifier_model, random_state=None, pred_label=None
):
n_timesteps, n_dims = input_sample.shape # n_dims=1
# for binary classification, default to 1
desired_label = int(1 - pred_label) if pred_label is not None else 1
seg_imp, seg_idx = LIMESegment(
input_sample,
classifier_model,
model_type=desired_label,
cp=10,
window_size=10,
random_state=random_state,
)
if desired_label == 1:
# calculate the threshold of masking, lower 25 percentile (neg contribution for pos class)
masking_threshold = np.percentile(seg_imp, 25)
masking_idx = np.where(seg_imp <= masking_threshold)
else: # desired_label == 0
# calculate the threshold of masking, upper 25 percentile (pos contribution for neg class)
masking_threshold = np.percentile(seg_imp, 75)
masking_idx = np.where(seg_imp >= masking_threshold)
weighted_steps = np.ones(n_timesteps)
for start_idx in masking_idx[0]:
weighted_steps[seg_idx[start_idx] : seg_idx[start_idx + 1]] = 0
# need to reshape for multiplication in `tf.math.multiply()`
weighted_steps = weighted_steps.reshape(1, n_timesteps, n_dims)
return weighted_steps
def get_global_weights(
input_samples, input_labels, classifier_model, random_state=None
):
n_samples, n_timesteps, n_dims = input_samples.shape # n_dims=1
class ModelWrapper:
def __init__(self, model):
self.model = model
def predict(self, X):
p = self.model.predict(X.reshape(n_samples, n_timesteps, 1))
return np.argmax(p, axis=1)
def fit(self, X, y):
return self.model.fit(X, y)
clf = ModelWrapper(classifier_model)
clf.fit(input_samples.reshape(input_samples.shape[0], -1), input_labels)
i = IntervalImportance(scoring="accuracy", n_intervals=10, random_state=random_state)
i.fit(clf, input_samples.reshape(input_samples.shape[0], -1), input_labels)
# calculate the threshold of masking, 75 percentile
masking_threshold = np.percentile(i.importances_.mean, 75)
masking_idx = np.where(i.importances_.mean >= masking_threshold)
weighted_steps = np.ones(n_timesteps)
seg_idx = i.intervals_
for start_idx in masking_idx[0]:
weighted_steps[seg_idx[start_idx][0] : seg_idx[start_idx][1]] = 0
# need to reshape for multiplication in `tf.math.multiply()`
weighted_steps = weighted_steps.reshape(1, n_timesteps, 1)
return weighted_steps

@ -0,0 +1,263 @@
#!/usr/bin/env python
# coding: utf-8
import logging
import os
from argparse import ArgumentParser
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.metrics import balanced_accuracy_score, confusion_matrix
from sklearn.model_selection import train_test_split, StratifiedKFold
from tensorflow import keras
from keras.utils import to_categorical
from wildboar.datasets import load_dataset
import pickle
from wildboar.explain import *
from .help_functions import (
ResultWriter,
conditional_pad,
# remove_paddings,
# evaluate,
# find_best_lr,
reset_seeds,
time_series_normalize,
upsample_minority,
# fit_evaluation_models,
# time_series_revert,
)
from .keras_models import *
# from ._guided import get_global_weights
class ModelWrapper:
def __init__(self, model):
self.model = model
def predict(self, X):
p = self.model.predict(X.reshape(X.shape[0], -1, 1))
return np.argmax(p, axis=1) # I think this would work?
def fit(self, X, y):
return self.model.fit(X, y)
def gc_latentcf_search_1dcnn(dataset, pos, neg, output, path, autoencoder="No"):
os.environ["TF_DETERMINISTIC_OPS"] = "1"
config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True
session = tf.compat.v1.Session(config=config)
logger = logging.getLogger(__name__)
print(f"Num GPUs Available: {len(tf.config.list_physical_devices('GPU'))}.")
numba_logger = logging.getLogger("numba")
numba_logger.setLevel(logging.WARNING)
# logger.info(f"LR list: {lr_list}.") # for debugging
# logger.info(f"W type: {w_type}.") # for debugging
RANDOM_STATE = 39
# PRED_MARGIN_W_LIST = [1.0, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1, 0.0] # TODO: write another script for ablation study
# pred_margin_weight = w_value
# logger.info(f"W value: {pred_margin_weight}.") # for debugging
# logger.info(f"Tau value: {tau_value}.") # for debugging
result_writer = ResultWriter(file_name=output, dataset_name=dataset)
print(f"Result writer is ready, writing to {output}...")
# If `A.output` file already exists, no need to write head (directly append)
if not os.path.isfile(output):
result_writer.write_head()
X = dataset.iloc[:, :-1].values
y = dataset.iloc[:, -1].values
pos_label, neg_label = 1, 0
y_copy = y.copy()
if pos != pos_label:
y_copy[y == pos] = pos_label # convert/normalize positive label to 1
if neg != neg_label:
y_copy[y == neg] = neg_label # convert negative label to 0
# skf = StratifiedKFold(n_splits=0, shuffle=True, random_state=RANDOM_STATE)
fold_idx = 0
# for train_index, test_index in skf.split(X, y_copy):
train_index, test_index = train_test_split(np.arange(X.shape[0]), test_size=0.8, random_state=42)
# lr_list_temp = lr_list
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y_copy[train_index], y_copy[test_index]
# Get 50 samples for CF evaluation if test size larger than 50
test_size = len(y_test)
if test_size >= 50:
try:
test_indices = np.arange(test_size)
_, _, _, rand_test_idx = train_test_split(
y_test,
test_indices,
test_size=50,
random_state=RANDOM_STATE,
stratify=y_test,
)
except (
ValueError
): # ValueError: The train_size = 1 should be greater or equal to the number of classes = 2
rand_test_idx = np.arange(test_size)
else:
rand_test_idx = np.arange(test_size)
pd.DataFrame(X_test[rand_test_idx]).to_csv(path + "/X_test.csv", index=None)
np.save(path + "/y_test", (y_test[rand_test_idx]))
np.save(path + "/y_train", (y_train))
X_train, X_val, y_train, y_val = train_test_split(
X_train,
y_train,
test_size=0.125,
random_state=RANDOM_STATE,
stratify=y_train,
)
# Upsample the minority class
y_train_copy = y_train.copy()
X_train, y_train = upsample_minority(
X_train, y_train, pos_label=pos_label, neg_label=neg_label
)
if y_train.shape != y_train_copy.shape:
print(
f"Data upsampling performed, current distribution of y_train: \n{pd.value_counts(y_train)}."
)
else:
print(
f"Current distribution of y_train: \n{pd.value_counts(y_train)}."
)
nb_classes = len(np.unique(y_train))
y_train_classes, y_val_classes, y_test_classes = (
y_train.copy(),
y_val.copy(),
y_test.copy(),
)
y_train, y_val, y_test = (
to_categorical(y_train, nb_classes),
to_categorical(y_val, nb_classes),
to_categorical(y_test, nb_classes),
)
# ### 1.1 Normalization - fit scaler using training data
n_training, n_timesteps = X_train.shape
n_features = 1
X_train_processed, trained_scaler = time_series_normalize(
data=X_train, n_timesteps=n_timesteps
)
X_val_processed, _ = time_series_normalize(
data=X_val, n_timesteps=n_timesteps, scaler=trained_scaler
)
X_test_processed, _ = time_series_normalize(
data=X_test, n_timesteps=n_timesteps, scaler=trained_scaler
)
# add extra padding zeros if n_timesteps cannot be divided by 4, required for 1dCNN autoencoder structure
X_train_processed_padded, padding_size = conditional_pad(X_train_processed)
X_val_processed_padded, _ = conditional_pad(X_val_processed)
X_test_processed_padded, _ = conditional_pad(X_test_processed)
n_timesteps_padded = X_train_processed_padded.shape[1]
print(
f"Data pre-processed, original #timesteps={n_timesteps}, padded #timesteps={n_timesteps_padded}."
)
# ## 2. LatentCF models
# reset seeds for numpy, tensorflow, python random package and python environment seed
reset_seeds()
###############################################
# ## 2.0 1DCNN classifier
###############################################
# ### 1DCNN classifier
classifier = Classifier(n_timesteps_padded, n_features, n_output=2)
optimizer = keras.optimizers.legacy.Adam(learning_rate=0.0001)
classifier.compile(
optimizer=optimizer, loss="binary_crossentropy", metrics=["accuracy"]
)
# Define the early stopping criteria
early_stopping_accuracy = keras.callbacks.EarlyStopping(
monitor="val_accuracy", patience=30, restore_best_weights=True
)
# Train the model
reset_seeds()
logger.info("Training log for 1DCNN classifier:")
classifier_history = classifier.fit(
X_train_processed_padded,
y_train,
epochs=150,
batch_size=32,
shuffle=True,
verbose=True,
validation_data=(X_val_processed_padded, y_val),
callbacks=[early_stopping_accuracy],
)
print(classifier_history)
y_pred = classifier.predict(X_test_processed_padded)
y_pred_classes = np.argmax(y_pred, axis=1)
# clf = ModelWrapper(classifier)
# i = IntervalImportance(scoring="accuracy", n_intervals=10, random_state=RANDOM_STATE)
# i.fit(clf, X_train_processed_padded.reshape(X_train_processed_padded.shape[0], -1), y_train_classes)
# pickle.dump(i, open(path+f"/interval_importance.sav", "wb"))
acc = balanced_accuracy_score(y_true=y_test_classes, y_pred=y_pred_classes)
print(f"1dCNN classifier trained, with test accuracy {acc}.")
confusion_matrix_df = pd.DataFrame(
confusion_matrix(
y_true=y_test_classes, y_pred=y_pred_classes, labels=[1, 0]
),
index=["True:pos", "True:neg"],
columns=["Pred:pos", "Pred:neg"],
)
print(f"Confusion matrix: \n{confusion_matrix_df}.")
# ###############################################
# # ## 2.1 CF search with 1dCNN autoencoder
# ###############################################
# # ### 1dCNN autoencoder
if autoencoder=="Yes":
autoencoder = Autoencoder(n_timesteps_padded, n_features)
optimizer = keras.optimizers.legacy.Adam(learning_rate=0.0005)
autoencoder.compile(optimizer=optimizer, loss="mse")
# Define the early stopping criteria
early_stopping = keras.callbacks.EarlyStopping(
monitor="val_loss", min_delta=0.0001, patience=5, restore_best_weights=True
)
# Train the model
reset_seeds()
logger.info("Training log for 1dCNN autoencoder:")
autoencoder_history = autoencoder.fit(
X_train_processed_padded,
X_train_processed_padded,
epochs=50,
batch_size=32,
shuffle=True,
verbose=True,
validation_data=(X_val_processed_padded, X_val_processed_padded),
callbacks=[early_stopping],
)
ae_val_loss = np.min(autoencoder_history.history["val_loss"])
pickle.dump(autoencoder, open(path+f"/autoencoder.sav", "wb"))
logger.info(f"1dCNN autoencoder trained, with validation loss: {ae_val_loss}.")
pickle.dump(classifier, open(path+f"/classifier.sav", "wb"))
# logger.info(
# f"Done for CF search [No autoencoder], pred_margin_weight={pred_margin_weight}."
# )
# logger.info("Done.")
# python src/gc_latentcf_search.py --dataset TwoLeadECG --pos 1 --neg 2 --output twoleadecg-outfile.csv --n-lstmcells 8 --w-type unconstrained --w-value 1 --tau-value 0.5 --lr-list 0.0001 0.0001 0.0001;
# gc_latentcf_search_1dcnn("TwoLeadECG", 1, 2, "twoleadecg-outfile.csv", 8, "unconstrained", 1, 0.5, [0.0001, 0.0001, 0.0001], "/home/lakes/extremum/base/glacier/src")

@ -0,0 +1,134 @@
#!/usr/bin/env python
# coding: utf-8
import numpy as np
import pandas as pd
from tensorflow import keras
from keras.utils import to_categorical
from sklearn.metrics import balanced_accuracy_score, confusion_matrix
from .help_functions import (
conditional_pad,
remove_paddings,
find_best_lr,
reset_seeds,
time_series_normalize,
time_series_revert,
)
from .keras_models import *
from ._guided import get_global_weights
def gc_compute_counterfactuals(
data_path, output_path, type, l, w_value, tau_value, classifier, autoencoder=None
):
testdata = pd.read_csv(data_path + "/X_test.csv", index_col=None)
X = pd.DataFrame(testdata).values
y = np.load(data_path + "/y_test.npy")
y_train = np.load(data_path + "/y_train.npy")
# X, y = X_test.iloc[:, :-1].values, X_test.iloc[:, -1].values
n_training, n_timesteps = X.shape
n_features = 1
x_processed, trained_scaler = time_series_normalize(data=X, n_timesteps=n_timesteps)
# add extra padding zeros if n_timesteps cannot be divided by 4, required for 1dCNN autoencoder structure
x_processed_padded, padding_size = conditional_pad(x_processed)
n_timesteps_padded = x_processed_padded.shape[1]
# ## 2. LatentCF models
# reset seeds for numpy, tensorflow, python random package and python environment seed
reset_seeds()
###############################################
# ## 2.0 1DCNN classifier
###############################################
# ### 1DCNN classifier
print("\t\t Y test: ", y)
y_pred = classifier.predict(x_processed_padded)
y_pred_classes = np.argmax(y_pred, axis=1)
acc = balanced_accuracy_score(y_true=y, y_pred=y_pred_classes)
np.save(output_path + "/y_pred.npy", y_pred_classes)
print("\t\t Y pred: ", y_pred_classes)
nb_classes = len(np.unique(y))
y_classes = y.copy()
y_classes = (
to_categorical(y_classes, nb_classes)
)
# ### 2.0.1 Get `step_weights` based on the input argument
if type == "global":
step_weights = get_global_weights(
x_processed_padded,
y_classes,
classifier,
random_state=39,
)
elif type == "uniform":
step_weights = np.ones((1, n_timesteps_padded, n_features))
elif type.lower() == "local":
step_weights = "local"
elif type == "unconstrained":
step_weights = np.zeros((1, n_timesteps_padded, n_features))
else:
raise NotImplementedError(
"A.w_type not implemented, please choose 'local', 'global', 'uniform', or 'unconstrained'."
)
pred_margin_weight = w_value
rand_X_test = x_processed_padded
rand_y_pred = y_pred_classes
# l = [0.0001, 0.0001, 0.0001]
lr_list3 = [l[0]] if l[0] is not None else [0.00001]
if autoencoder:
best_lr3, best_cf_model3, best_cf_samples3, _ = find_best_lr(
classifier,
X_samples=rand_X_test,
pred_labels=rand_y_pred,
autoencoder=autoencoder,
lr_list=lr_list3,
pred_margin_weight=pred_margin_weight,
step_weights=step_weights,
random_state=39,
padding_size=padding_size,
target_prob=tau_value,
)
else:
best_lr3, best_cf_model3, best_cf_samples3, _ = find_best_lr(
classifier,
X_samples=rand_X_test,
pred_labels=rand_y_pred,
lr_list=lr_list3,
pred_margin_weight=pred_margin_weight,
step_weights=step_weights,
random_state=39,
padding_size=padding_size,
target_prob=tau_value,
)
# ### Evaluation metrics
# predicted probabilities of CFs
z_pred3 = classifier.predict(best_cf_samples3)
cf_pred_labels3 = np.argmax(z_pred3, axis=1)
np.save(output_path + "/cf_pred.npy", cf_pred_labels3)
print("\t\t CF pred: ", cf_pred_labels3)
# remove extra paddings after counterfactual generation in 1dCNN autoencoder
best_cf_samples3 = remove_paddings(best_cf_samples3, padding_size)
best_cf_samples3_reverted = time_series_revert(
best_cf_samples3, n_timesteps, n_features, trained_scaler
)
np.save(output_path + "/X_cf.npy", best_cf_samples3_reverted)
cf_dataframe = pd.DataFrame(best_cf_samples3_reverted.reshape(-1, n_timesteps))
cf_dataframe[len(cf_dataframe.columns)] = cf_pred_labels3
cf_dataframe.to_csv(output_path + "/cf_test.csv", header=None, index=None)

@ -0,0 +1,449 @@
import os
import csv
import random as python_random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils import resample, shuffle
from sklearn.metrics import accuracy_score
from sklearn.neighbors import LocalOutlierFactor, NearestNeighbors
from ._guided import ModifiedLatentCF
# from keras import backend as K
class ResultWriter:
def __init__(self, file_name, dataset_name):
self.file_name = file_name
self.dataset_name = dataset_name
def write_head(self):
# write the head in csv file
with open(self.file_name, "w") as f:
writer = csv.writer(f)
writer.writerow(
[
"dataset",
"fold_id",
"method",
"classifier_accuracy",
"autoencoder_loss",
"best_lr",
"proximity",
"validity",
"lof_score",
"relative_proximity",
"compactness",
"pred_margin_weight",
"step_weight_type",
"threshold_tau",
]
)
def write_result(
self,
fold_id,
method_name,
acc,
ae_loss,
best_lr,
evaluate_res,
pred_margin_weight=1.0,
step_weight_type="",
threshold_tau=0.5,
):
(
proxi,
valid,
lof_score,
relative_proximity,
compactness,
) = evaluate_res
with open(self.file_name, "a") as f:
writer = csv.writer(f)
writer.writerow(
[
self.dataset_name,
fold_id,
method_name,
acc,
ae_loss,
best_lr,
proxi,
valid,
lof_score,
relative_proximity,
compactness,
pred_margin_weight,
step_weight_type,
threshold_tau,
]
)
"""
time series scaling
"""
def time_series_normalize(data, n_timesteps, n_features=1, scaler=None):
# reshape data to 1 column
data_reshaped = data.reshape(-1, 1)
if scaler is None:
scaler = MinMaxScaler(feature_range=(0, 1))
scaler.fit(data_reshaped)
normalized = scaler.transform(data_reshaped)
# return reshaped data into [samples, timesteps, features]
return normalized.reshape(-1, n_timesteps, n_features), scaler
def time_series_revert(normalized_data, n_timesteps, n_features=1, scaler=None):
# reshape data to 1 column
data_reshaped = normalized_data.reshape(-1, 1)
reverted = scaler.inverse_transform(data_reshaped)
# return reverted data into [samples, timesteps, features]
return reverted.reshape(-1, n_timesteps, n_features)
"""
data pre-processing
"""
def conditional_pad(X):
num = X.shape[1]
if num % 4 != 0:
# find the next integer that can be divided by 4
next_num = (int(num / 4) + 1) * 4
padding_size = next_num - num
X_padded = np.pad(
X, pad_width=((0, 0), (0, padding_size), (0, 0))
) # pad for 3d array
return X_padded, padding_size
# else return the original X
return X, 0 # padding size = 0
def remove_paddings(cf_samples, padding_size):
if padding_size != 0:
# use np.squeeze() to cut the last time-series dimension, for evaluation
cf_samples = np.squeeze(cf_samples[:, :-padding_size, :])
else:
cf_samples = np.squeeze(cf_samples)
return cf_samples
# Upsampling the minority class
def upsample_minority(X, y, pos_label=1, neg_label=0, random_state=39):
# Get counts
pos_counts = pd.value_counts(y)[pos_label]
neg_counts = pd.value_counts(y)[neg_label]
# Divide by class
X_pos, X_neg = X[y == pos_label], X[y == neg_label]
if pos_counts == neg_counts:
# Balanced dataset
return X, y
elif pos_counts > neg_counts:
# Imbalanced dataset
X_neg_over = resample(
X_neg, replace=True, n_samples=pos_counts, random_state=random_state
)
X_concat = np.concatenate([X_pos, X_neg_over], axis=0)
y_concat = np.array(
[pos_label for i in range(pos_counts)]
+ [neg_label for j in range(pos_counts)]
)
else:
# Imbalanced dataset
X_pos_over = resample(
X_pos, replace=True, n_samples=neg_counts, random_state=random_state
)
X_concat = np.concatenate([X_pos_over, X_neg], axis=0)
y_concat = np.array(
[pos_label for i in range(neg_counts)]
+ [neg_label for j in range(neg_counts)]
)
# Shuffle the index after up-sampling
X_concat, y_concat = shuffle(X_concat, y_concat, random_state=random_state)
return X_concat, y_concat
"""
deep models needed
"""
# Method: For plotting the accuracy/loss of keras models
def plot_graphs(history, string):
plt.plot(history.history[string])
plt.plot(history.history["val_" + string])
plt.xlabel("Epochs")
plt.ylabel(string)
plt.legend([string, "val_" + string])
plt.show()
# Method: Fix the random seeds to get consistent models
def reset_seeds(seed_value=39):
# ref: https://keras.io/getting_started/faq/#how-can-i-obtain-reproducible-results-using-keras-during-development
os.environ["PYTHONHASHSEED"] = str(seed_value)
# necessary for starting Numpy generated random numbers in a well-defined initial state.
np.random.seed(seed_value)
# necessary for starting core Python generated random numbers in a well-defined state.
python_random.seed(seed_value)
# set_seed() will make random number generation
tf.random.set_seed(seed_value)
"""
evaluation metrics
"""
def fit_evaluation_models(n_neighbors_lof, n_neighbors_nn, training_data):
# Fit the LOF model for novelty detection (novelty=True)
lof_estimator = LocalOutlierFactor(
n_neighbors=n_neighbors_lof,
novelty=True,
metric="euclidean",
)
lof_estimator.fit(training_data)
# Fit an unsupervised 1NN with all the training samples from the desired class
nn_model = NearestNeighbors(n_neighbors=n_neighbors_nn, metric="euclidean")
nn_model.fit(training_data)
return lof_estimator, nn_model
def evaluate(
X_pred_neg,
cf_samples,
pred_labels,
cf_labels,
lof_estimator_pos,
lof_estimator_neg,
nn_estimator_pos,
nn_estimator_neg,
):
proxi = euclidean_distance(X_pred_neg, cf_samples)
valid = validity_score(pred_labels, cf_labels)
compact = compactness_score(X_pred_neg, cf_samples)
# TODO: add LOF and RP score for debugging training?
lof_score = calculate_lof(
cf_samples, pred_labels, lof_estimator_pos, lof_estimator_neg
)
rp_score = relative_proximity(
X_pred_neg, cf_samples, pred_labels, nn_estimator_pos, nn_estimator_neg
)
return proxi, valid, lof_score, rp_score, compact
def euclidean_distance(X, cf_samples, average=True):
paired_distances = np.linalg.norm(X - cf_samples, axis=1)
return np.mean(paired_distances) if average else paired_distances
def validity_score(pred_labels, cf_labels):
desired_labels = 1 - pred_labels # for binary classification
return accuracy_score(y_true=desired_labels, y_pred=cf_labels)
# originally from: https://github.com/isaksamsten/wildboar/blob/859758884677ba32a601c53a5e2b9203a644aa9c/src/wildboar/metrics/_counterfactual.py#L279
def compactness_score(X, cf_samples):
# absolute tolerance atol=0.01, 0.001, OR 0.0001?
c = np.isclose(X, cf_samples, atol=0.01)
# return a positive compactness, instead of 1 - np.mean(..)
return np.mean(c, axis=(1, 0))
# def sax_compactness(X, cf_samples, n_timesteps):
# from wildboar.transform import symbolic_aggregate_approximation
# X = symbolic_aggregate_approximation(X, window=window, n_bins=n_bins)
# cf_samples = symbolic_aggregate_approximation(
# cf_samples, window=window, n_bins=n_bins
# )
# # absolute tolerance atol=0.01, 0.001, OR 0.0001?
# c = np.isclose(X, cf_samples, atol=0.01)
# return np.mean(1 - np.sum(c, axis=1) / n_timesteps)
def calculate_lof(cf_samples, pred_labels, lof_estimator_pos, lof_estimator_neg):
desired_labels = 1 - pred_labels # for binary classification
pos_idx, neg_idx = (
np.where(desired_labels == 1)[0], # pos_label = 1
np.where(desired_labels == 0)[0], # neg_label - 0
)
# check if the NumPy array is empty
if pos_idx.any():
y_pred_cf1 = lof_estimator_pos.predict(cf_samples[pos_idx])
n_error_cf1 = y_pred_cf1[y_pred_cf1 == -1].size
else:
n_error_cf1 = 0
if neg_idx.any():
y_pred_cf2 = lof_estimator_neg.predict(cf_samples[neg_idx])
n_error_cf2 = y_pred_cf2[y_pred_cf2 == -1].size
else:
n_error_cf2 = 0
lof_score = (n_error_cf1 + n_error_cf2) / cf_samples.shape[0]
return lof_score
def relative_proximity(
X_inputs, cf_samples, pred_labels, nn_estimator_pos, nn_estimator_neg
):
desired_labels = 1 - pred_labels # for binary classification
nn_distance_list = np.array([])
proximity_list = np.array([])
pos_idx, neg_idx = (
np.where(desired_labels == 1)[0], # pos_label = 1
np.where(desired_labels == 0)[0], # neg_label = 0
)
if pos_idx.any():
nn_distances1, _ = nn_estimator_pos.kneighbors(
X_inputs[pos_idx], return_distance=True
)
nn_distances1 = np.squeeze(nn_distances1, axis=-1)
proximity1 = euclidean_distance(
X_inputs[pos_idx], cf_samples[pos_idx], average=False
)
nn_distance_list = np.concatenate((nn_distance_list, nn_distances1), axis=0)
proximity_list = np.concatenate((proximity_list, proximity1), axis=0)
if neg_idx.any():
nn_distances2, _ = nn_estimator_neg.kneighbors(
X_inputs[neg_idx], return_distance=True
)
nn_distances2 = np.squeeze(nn_distances2, axis=-1)
proximity2 = euclidean_distance(
X_inputs[neg_idx], cf_samples[neg_idx], average=False
)
nn_distance_list = np.concatenate((nn_distance_list, nn_distances2), axis=0)
proximity_list = np.concatenate((proximity_list, proximity2), axis=0)
# TODO: paired proximity score for (X_pred_neg, cf_samples), if not average (?)
# relative_proximity = proximity / nn_distances.mean()
relative_proximity = proximity_list.mean() / nn_distance_list.mean()
return relative_proximity
"""
counterfactual model needed
"""
def find_best_lr(
classifier,
X_samples,
pred_labels,
autoencoder=None,
encoder=None,
decoder=None,
lr_list=[0.001, 0.0001],
pred_margin_weight=1.0,
step_weights=None,
random_state=None,
padding_size=0,
target_prob=0.5,
):
# Find the best alpha for vanilla LatentCF
best_cf_model, best_cf_samples, best_cf_embeddings = None, None, None
best_losses, best_valid_frac, best_lr = 0, -1, 0
for lr in lr_list:
print(f"======================== CF search started, with lr={lr}.")
# Fit the LatentCF model
# TODO: fix the class name here: ModifiedLatentCF or GuidedLatentCF? from _guided or _composite?
if encoder and decoder:
cf_model = ModifiedLatentCF(
probability=target_prob,
only_encoder=encoder,
only_decoder=decoder,
optimizer=tf.optimizers.Adam(learning_rate=lr),
pred_margin_weight=pred_margin_weight,
step_weights=step_weights,
random_state=random_state,
)
else:
## here
cf_model = ModifiedLatentCF(
probability=target_prob,
autoencoder=autoencoder,
optimizer=tf.optimizers.Adam(learning_rate=lr),
pred_margin_weight=pred_margin_weight,
step_weights=step_weights,
random_state=random_state,
)
## here
cf_model.fit(classifier)
if encoder and decoder:
cf_embeddings, losses, _ = cf_model.transform(X_samples, pred_labels)
cf_samples = decoder.predict(cf_embeddings)
# predicted probabilities of CFs
z_pred = classifier.predict(cf_embeddings)
cf_pred_labels = np.argmax(z_pred, axis=1)
else:
## here
cf_samples, losses, _ = cf_model.transform(X_samples, pred_labels)
# predicted probabilities of CFs
z_pred = classifier.predict(cf_samples)
cf_pred_labels = np.argmax(z_pred, axis=1)
## here
valid_frac = validity_score(pred_labels, cf_pred_labels)
proxi_score = euclidean_distance(
remove_paddings(X_samples, padding_size),
remove_paddings(cf_samples, padding_size),
)
# uncomment for debugging
print(f"lr={lr} finished. Validity: {valid_frac}, proximity: {proxi_score}.")
# TODO: fix (padding) dimensions of `lof_estimator` and `nn_estimator` during training, for debugging
# proxi_score, valid_frac, lof_score, rp_score, cost_mean, cost_std = evaluate(
# X_pred_neg=X_samples,
# cf_samples=cf_samples,
# z_pred=z_pred,
# n_timesteps=_,
# lof_estimator=lof_estimator,
# nn_estimator=nn_estimator,
# )
# if valid_frac >= best_valid_frac and proxi_score <= best_proxi_score:
if valid_frac >= best_valid_frac:
best_cf_model, best_cf_samples = cf_model, cf_samples
best_losses, best_lr, best_valid_frac = losses, lr, valid_frac
if encoder and decoder:
best_cf_embeddings = cf_embeddings
return best_lr, best_cf_model, best_cf_samples, best_cf_embeddings

260
base/glacier/src/keras_models.py Executable file

@ -0,0 +1,260 @@
from tensorflow import keras
"""
1dCNN models
"""
def Autoencoder(n_timesteps, n_features):
# Define encoder and decoder structure
def Encoder(input):
x = keras.layers.Conv1D(
filters=64, kernel_size=3, activation="relu", padding="same"
)(input)
x = keras.layers.MaxPool1D(pool_size=2, padding="same")(x)
x = keras.layers.Conv1D(
filters=32, kernel_size=3, activation="relu", padding="same"
)(x)
x = keras.layers.MaxPool1D(pool_size=2, padding="same")(x)
return x
def Decoder(input):
x = keras.layers.Conv1D(
filters=32, kernel_size=3, activation="relu", padding="same"
)(input)
x = keras.layers.UpSampling1D(size=2)(x)
x = keras.layers.Conv1D(
filters=64, kernel_size=3, activation="relu", padding="same"
)(x)
# x = keras.layers.Conv1D(filters=64, kernel_size=2, activation="relu")(x)
x = keras.layers.UpSampling1D(size=2)(x)
x = keras.layers.Conv1D(
filters=1, kernel_size=3, activation="linear", padding="same"
)(x)
return x
# Define the AE model
orig_input = keras.Input(shape=(n_timesteps, n_features))
autoencoder = keras.Model(inputs=orig_input, outputs=Decoder(Encoder(orig_input)))
return autoencoder
def Classifier(
n_timesteps, n_features, n_conv_layers=1, add_dense_layer=True, n_output=1
):
# https://keras.io/examples/timeseries/timeseries_classification_from_scratch/
inputs = keras.Input(shape=(n_timesteps, n_features), dtype="float32")
if add_dense_layer:
x = keras.layers.Dense(128)(inputs)
else:
x = inputs
for i in range(n_conv_layers):
x = keras.layers.Conv1D(filters=64, kernel_size=3, padding="same")(x)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.ReLU()(x)
x = keras.layers.MaxPooling1D(pool_size=2, padding="same")(x)
x = keras.layers.Flatten()(x)
if n_output >= 2:
outputs = keras.layers.Dense(n_output, activation="softmax")(x)
else:
outputs = keras.layers.Dense(1, activation="sigmoid")(x)
classifier = keras.models.Model(inputs=inputs, outputs=outputs)
return classifier
"""
LSTM models
"""
def AutoencoderLSTM(n_timesteps, n_features):
# Define encoder and decoder structure
# structure from medium post: https://towardsdatascience.com/step-by-step-understanding-lstm-autoencoder-layers-ffab055b6352
def EncoderLSTM(input):
# x = keras.layers.LSTM(64, activation='relu', return_sequences=True)(input)
x = keras.layers.LSTM(64, activation="tanh", return_sequences=True)(input)
# encoded = keras.layers.LSTM(32, activation='relu', return_sequences=False)(x)
encoded = keras.layers.LSTM(32, activation="tanh", return_sequences=False)(x)
return encoded
def DecoderLSTM(encoded):
x = keras.layers.RepeatVector(n_timesteps)(encoded)
# x = keras.layers.LSTM(32, activation='relu', return_sequences=True)(x)
x = keras.layers.LSTM(32, activation="tanh", return_sequences=True)(x)
# x = keras.layers.LSTM(64, activation='relu', return_sequences=True)(x)
x = keras.layers.LSTM(64, activation="tanh", return_sequences=True)(x)
decoded = keras.layers.TimeDistributed(
keras.layers.Dense(n_features, activation="sigmoid")
)(x)
return decoded
# Define the AE model
orig_input2 = keras.Input(shape=(n_timesteps, n_features))
autoencoder2 = keras.Model(
inputs=orig_input2, outputs=DecoderLSTM(EncoderLSTM(orig_input2))
)
return autoencoder2
def ClassifierLSTM(n_timesteps, n_features, extra_lstm_layer=True, n_output=1):
# Define the model structure - only LSTM layers
# https://www.kaggle.com/szaitseff/classification-of-time-series-with-lstm-rnn
inputs = keras.Input(shape=(n_timesteps, n_features), dtype="float32")
if extra_lstm_layer:
x = keras.layers.LSTM(64, activation="tanh", return_sequences=True)(
inputs
) # set return_sequences true to feed next LSTM layer
else:
x = keras.layers.LSTM(32, activation="tanh", return_sequences=False)(
inputs
) # set return_sequences false to feed dense layer directly
x = keras.layers.BatchNormalization()(x)
# x = keras.layers.LSTM(32, activation='tanh', return_sequences=True)(x)
# x = keras.layers.BatchNormalization()(x)
if extra_lstm_layer:
x = keras.layers.LSTM(16, activation="tanh", return_sequences=False)(x)
x = keras.layers.BatchNormalization()(x)
if n_output >= 2:
outputs = keras.layers.Dense(n_output, activation="softmax")(x)
else:
outputs = keras.layers.Dense(1, activation="sigmoid")(x)
classifier2 = keras.Model(inputs, outputs)
return classifier2
"""
composite autoencoder
"""
def CompositeAutoencoder(n_timesteps, n_features, n_output_classifier=1):
# https://machinelearningmastery.com/lstm-autoencoders/
# https://stackoverflow.com/questions/48603328/how-do-i-split-an-convolutional-autoencoder
# Composite 1dCNN Autoencoder
# encoder:
inputs = keras.layers.Input(shape=(n_timesteps, n_features))
x = keras.layers.Conv1D(
filters=64, kernel_size=3, activation="relu", padding="same"
)(inputs)
x = keras.layers.MaxPool1D(pool_size=2, padding="same")(x)
x = keras.layers.Conv1D(
filters=32, kernel_size=3, activation="relu", padding="same"
)(x)
encoded = keras.layers.MaxPool1D(pool_size=2, padding="same")(x)
encoder3 = keras.models.Model(inputs, encoded)
_, encoded_dim1, encoded_dim2 = encoder3.layers[-1].output_shape
# decoder
decoder_inputs = keras.layers.Input(shape=(encoded_dim1, encoded_dim2))
x = keras.layers.Conv1D(
filters=32, kernel_size=3, activation="relu", padding="same"
)(decoder_inputs)
x = keras.layers.UpSampling1D(size=2)(x)
x = keras.layers.Conv1D(
filters=64, kernel_size=3, activation="relu", padding="same"
)(x)
# x = keras.layers.Conv1D(filters=64, kernel_size=2, activation="relu")(x)
x = keras.layers.UpSampling1D(size=2)(x)
decoded = keras.layers.Conv1D(
filters=1, kernel_size=3, activation="linear", padding="same"
)(x)
decoder3 = keras.models.Model(decoder_inputs, decoded, name="only_decoder")
# classifier based on previous encoder
classifier_inputs = keras.layers.Input(shape=(encoded_dim1, encoded_dim2))
x = keras.layers.Conv1D(
filters=16, kernel_size=3, activation="relu", padding="same"
)(classifier_inputs)
x = keras.layers.MaxPool1D(pool_size=2, padding="same")(x)
x = keras.layers.Flatten()(x)
x = keras.layers.Dense(50, activation="relu")(x)
if n_output_classifier >= 2:
classifier_outputs = keras.layers.Dense(
n_output_classifier, activation="softmax"
)(x)
else:
classifier_outputs = keras.layers.Dense(1, activation="sigmoid")(x)
classifier3 = keras.models.Model(
classifier_inputs, classifier_outputs, name="only_classifier"
)
# composite autoencoder
encoded = encoder3(inputs)
decoded = decoder3(encoded)
classified = classifier3(encoded)
composite_autoencoder = keras.models.Model(inputs, [decoded, classified])
return composite_autoencoder, encoder3, decoder3, classifier3
def LSTMFCNClassifier(n_timesteps, n_features, n_output=2, n_LSTM_cells=8):
# https://github.com/titu1994/LSTM-FCN/blob/master/hyperparameter_search.py
inputs = keras.Input(shape=(n_timesteps, n_features), dtype="float32")
x = keras.layers.LSTM(units=n_LSTM_cells)(inputs)
x = keras.layers.Dropout(rate=0.8)(x)
y = keras.layers.Permute((2, 1))(inputs)
y = keras.layers.Conv1D(128, 8, padding="same", kernel_initializer="he_uniform")(y)
y = keras.layers.BatchNormalization()(y)
y = keras.layers.ReLU()(y)
y = keras.layers.Conv1D(256, 5, padding="same", kernel_initializer="he_uniform")(y)
y = keras.layers.BatchNormalization()(y)
y = keras.layers.ReLU()(y)
y = keras.layers.Conv1D(128, 3, padding="same", kernel_initializer="he_uniform")(y)
y = keras.layers.BatchNormalization()(y)
y = keras.layers.ReLU()(y)
y = keras.layers.GlobalAveragePooling1D()(y)
x = keras.layers.concatenate([x, y])
outputs = keras.layers.Dense(n_output, activation="softmax")(x)
classifier = keras.models.Model(inputs=inputs, outputs=outputs)
return classifier
def Classifier_FCN(input_shape, nb_classes):
input_layer = keras.layers.Input(input_shape)
conv1 = keras.layers.Conv1D(filters=128, kernel_size=8, padding="same")(input_layer)
conv1 = keras.layers.BatchNormalization()(conv1)
conv1 = keras.layers.Activation(activation="relu")(conv1)
conv2 = keras.layers.Conv1D(filters=256, kernel_size=5, padding="same")(conv1)
conv2 = keras.layers.BatchNormalization()(conv2)
conv2 = keras.layers.Activation("relu")(conv2)
conv3 = keras.layers.Conv1D(128, kernel_size=3, padding="same")(conv2)
conv3 = keras.layers.BatchNormalization()(conv3)
conv3 = keras.layers.Activation("relu")(conv3)
gap_layer = keras.layers.GlobalAveragePooling1D()(conv3)
output_layer = keras.layers.Dense(nb_classes, activation="softmax")(gap_layer)
model = keras.models.Model(inputs=input_layer, outputs=output_layer)
return model

@ -0,0 +1,11 @@
fastdtw==0.3.4
joblib==1.4.2
keras==3.5.0
matplotlib==3.9.2
numpy==2.1.0
pandas==2.2.2
scikit_learn==1.5.1
scipy==1.14.1
stumpy==1.13.0
tensorflow==2.17.0
wildboar==1.2.0