-
Notifications
You must be signed in to change notification settings - Fork 1
/
nt_gg.py
282 lines (239 loc) · 12.3 KB
/
nt_gg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
import os
import pickle
import matplotlib.pyplot as plt
import numpy as np
from keras.layers import Dense, Dropout, LeakyReLU
from keras.models import Sequential
from keras.optimizers import Adam
from numpy.random import randn
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
from tqdm import tqdm
class GG(object):
def __init__(self, number_of_features, saved_models_path, learning_rate, dropout, alpha):
"""
The constructor for the General Generator class.
:param number_of_features: Number of features in the data. Used to determine the noise dimensions
:param saved_models_path: The folder where we save the models.
"""
self.saved_models_path = saved_models_path
self.number_of_features = number_of_features
self.generator_model = None
self.discriminator_model = RandomForestClassifier()
self.dropout = dropout
self.alpha = alpha
self.noise_dim = int(number_of_features / 2)
self.learning_rate = learning_rate
self.number_of_features = number_of_features
self.build_generator() # build the generator.
self.losses = {'gen_loss': [], 'dis_loss_pred': [], 'dis_loss_proba': []}
# self.results = {}
def build_generator(self):
"""
This function creates the generator model for the GG.
We used a fairly simple MLP architecture.
:return:
"""
self.generator_model = Sequential()
self.generator_model.add(Dense(int(self.number_of_features * 2), input_shape=(self.noise_dim + 1, )))
self.generator_model.add(LeakyReLU(alpha=self.alpha))
self.generator_model.add(Dense(int(self.number_of_features * 4)))
self.generator_model.add(LeakyReLU(alpha=self.alpha))
self.generator_model.add(Dropout(self.dropout))
self.generator_model.add(Dense(int(self.number_of_features * 2)))
self.generator_model.add(LeakyReLU(alpha=self.alpha))
self.generator_model.add(Dropout(self.dropout))
self.generator_model.add(Dense(self.number_of_features, activation='sigmoid'))
optimizer = Adam(lr=self.learning_rate)
self.generator_model.compile(loss='categorical_crossentropy', optimizer=optimizer)
# self.generator_model.summary()
def train_gg(self, x_train, y_train, epochs, batch_size, model_name, data, output_path, to_plot=False):
"""
This function running the training stage manually.
:param output_path: Path to save loss fig
:param to_plot: Plots the losses if True
:param x_train: the training set features
:param y_train: the training set classes
:param model_name: name of model to save (for generator)
:param epochs: number of epochs
:param batch_size: the batch size
:return: trains the discriminator and generator.
"""
losses_path = os.path.join(self.saved_models_path, f'{model_name}_losses')
model_file = os.path.join(self.saved_models_path, f'{model_name}_part_2_gen_weights.h5')
# First train the discriminator
self.train_black_box_dis(x_train, y_train)
self.train_generator(x_train, model_file, epochs, batch_size, losses_path)
if to_plot:
self.plot_losses(data, output_path)
def train_black_box_dis(self, x_train, y_train):
"""
Trains the discriminator and saves it.
:param x_train: the training set features
:param y_train: the training set classes
:return:
"""
dis_output = os.path.join(self.saved_models_path, 'black_box_dis_model')
if os.path.exists(dis_output):
# print('Blackbox discriminator already trained')
with open(dis_output, 'rb') as rf_file:
self.discriminator_model = pickle.load(rf_file)
self.discriminator_model.fit(x_train, y_train)
with open(dis_output, 'wb') as rf_file:
pickle.dump(self.discriminator_model, rf_file)
def train_generator(self, data, model_path, epochs, start_batch_size, losses_path):
"""
Function for training the general generator.
:param losses_path: The filepath for the loss results
:param data: The normalized dataset
:param model_path: The name of the model to save. includes epoch size, batches etc.
:param epochs: Number of epochs
:param start_batch_size: Size of batch to use.
:return: trains the generator, saves it and the losses during training.
"""
if os.path.exists(model_path):
self.generator_model.load_weights(model_path)
with open(losses_path, 'rb') as loss_file:
self.losses = pickle.load(loss_file)
return
for epoch in range(epochs): # iterates over the epochs
np.random.shuffle(data)
batch_size = start_batch_size
for i in tqdm(range(0, data.shape[0], batch_size), ascii=True): # Iterate over batches
if data.shape[0] - i >= batch_size:
batch_input = data[i:i + batch_size]
else: # The last iteration
batch_input = data[i:]
batch_size = batch_input.shape[0]
g_loss = self.train_generator_on_batch(batch_input)
self.losses['gen_loss'].append(g_loss)
self.save_generator_model(model_path, losses_path)
def save_generator_model(self, generator_model_path, losses_path):
"""
Saves the model and the loss data with pickle.
:param generator_model_path: File path for the generator
:param losses_path: File path for the losses
:return:
"""
self.generator_model.save_weights(generator_model_path)
with open(losses_path, 'wb+') as loss_file:
pickle.dump(self.losses, loss_file)
def train_generator_on_batch(self, batch_input):
"""
Trains the generator for a single batch. Creates the necessary input, comprised of noise and the real
probabilities obtained from the black box. Compared to the target output, made of real samples and the
probabilities made up by the generator.
:param batch_input:
:return:
"""
batch_size = batch_input.shape[0]
discriminator_probabilities = self.discriminator_model.predict_proba(batch_input)[:, -1:]
# noise = randn(self.noise_dim * batch_size).reshape((batch_size, self.noise_dim))
noise = randn(batch_size, self.noise_dim)
gen_model_input = np.hstack([noise, discriminator_probabilities])
generated_probabilities = self.generator_model.predict(gen_model_input)[:, -1:] # Take only probabilities
target_output = np.hstack([batch_input, generated_probabilities])
g_loss = self.generator_model.train_on_batch(gen_model_input, target_output) # The actual training
return g_loss
def plot_discriminator_results(self, x_test, y_test, data, path):
"""
:param x_test: Test set
:param y_test: Test classes
:return: Prints the required plots.
"""
blackbox_probs = self.discriminator_model.predict_proba(x_test)
discriminator_predictions = self.discriminator_model.predict(x_test)
count_1 = int(np.sum(y_test))
count_0 = int(y_test.shape[0] - count_1)
class_data = (['Class 0', 'Class 1'], [count_0, count_1])
self.plot_data(class_data, path, mode='bar', x_title='Class', title=f'Distribution of classes - {data} dataset')
self.plot_data(blackbox_probs[:, 0], path, title=f'Probabilities for test set - class 0 - {data} dataset')
self.plot_data(blackbox_probs[:, 1], path, title=f'Probabilities for test set - class 1 - {data} dataset')
min_confidence = blackbox_probs[:, 0].min(), blackbox_probs[:, 1].min()
max_confidence = blackbox_probs[:, 0].max(), blackbox_probs[:, 1].max()
mean_confidence = blackbox_probs[:, 0].mean(), blackbox_probs[:, 1].mean()
print("Accuracy:", metrics.accuracy_score(y_test, discriminator_predictions))
for c in [0, 1]:
print(f'Class {c} - Min confidence: {min_confidence[c]} - Max Confidence: {max_confidence[c]} - '
f'Mean confidence: {mean_confidence[c]}')
def plot_generator_results(self, data, path, num_of_instances=1000):
"""
Creates plots for the generator results on 1000 instances.
:param path:
:param data: Name of dataset used.
:param num_of_instances: Number of samples to generate.
:return:
"""
sampled_proba, generated_instances = self.generate_n_samples(num_of_instances)
proba_fake = self.discriminator_model.predict_proba(generated_instances[:, :-1])
for c in [0, 1]:
title = f'Confidence Score for Class {c} of Fake Samples - {data} dataset'
self.plot_data(proba_fake[:, c], path, x_title='Confidence Score', title=title)
black_box_confidence = proba_fake[:, 1:]
proba_error = np.abs(sampled_proba - black_box_confidence)
generated_classes = np.array([int(round(c)) for c in generated_instances[:, -1].tolist()]).reshape(1000, 1)
proba_stats = np.hstack([sampled_proba, generated_classes, proba_fake[:, :1], proba_fake[:, 1:], proba_error])
for c in [0, 1]:
class_data = proba_stats[proba_stats[:, 1] == c]
class_data = class_data[class_data[:, 0].argsort()] # Sort it for the plot
title = f'Error rate for different probabilities, class {c} - {data} dataset'
self.plot_data((class_data[:, 0], class_data[:, -1]), path, mode='plot', y_title='error rate', title=title)
def generate_n_samples(self, n):
"""
Functions for generating N samples with a uniformly distribution confidence level.
:param n: Number of samples
:return: a tuple of the confidence scores used and the samples created.
"""
noise = randn(n, self.noise_dim)
# confidences = np.sort(np.random.uniform(0, 1, (n, 1)), axis=0)
confidences = np.random.uniform(0, 1, (n, 1))
generator_input = np.hstack([noise, confidences]) # Stick them together
generated_instances = self.generator_model.predict(generator_input) # Create samples
return confidences, generated_instances
@staticmethod
def plot_data(data, path, mode='hist', x_title='Probabilities', y_title='# of Instances', title='Distribution'):
"""
:param path: Path to save
:param mode: Mode to use
:param y_title: Title of y axis
:param x_title: Title of x axis
:param data: Data to plot
:param title: Title of plot
:return: Prints a plot
"""
plt.clf()
if mode == 'hist':
plt.hist(data)
elif mode == 'bar':
plt.bar(data[0], data[1])
else:
plt.plot(data[0], data[1])
plt.title(title)
plt.ylabel(y_title)
plt.xlabel(x_title)
# plt.show()
path = os.path.join(path, title)
plt.savefig(path)
def plot_losses(self, data, path):
"""
Plot the losses while training
:return:
"""
plt.clf()
plt.plot(self.losses['gen_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Iteration')
# plt.show()
plt.savefig(os.path.join(path, f'{data} dataset - general_generator_loss.png'))
def get_error(self, num_of_instances=1000):
"""
Calculates the error of the generator we created by measuring the difference between the probability that
was given as input and the probability of the discriminator on the sample created.
:param num_of_instances: Number of samples to generate.
:return: An array of errors.
"""
sampled_proba, generated_instances = self.generate_n_samples(num_of_instances)
proba_fake = self.discriminator_model.predict_proba(generated_instances[:, :-1])
black_box_confidence = proba_fake[:, 1:]
return np.abs(sampled_proba - black_box_confidence)