-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
2,095 additions
and
1,834 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
# imports | ||
import numpy as np | ||
|
||
from keras.layers import Layer, PReLU, Conv2D, Activation, Conv2DTranspose, GaussianNoise, Lambda, Flatten, Reshape, \ | ||
BatchNormalization, Reshape | ||
from sklearn.model_selection import train_test_split | ||
import tensorflow as tf | ||
import keras | ||
from keras.datasets import cifar10 | ||
from keras.models import Sequential | ||
from keras import backend as K | ||
import math | ||
|
||
save_directory = 'saves1/' | ||
|
||
# Load dataset | ||
(X_train, Y_train), (X_test, Y_test) = cifar10.load_data() | ||
# Divide data into test and validdation | ||
X_test, X_validation, Y_test, Y_validation = train_test_split(X_test, Y_test, test_size=0.33, random_state=42) | ||
|
||
# Normalizing dataset | ||
X_train_norm = X_train / 255 | ||
X_test_norm = X_test / 255 | ||
X_validation_norm = X_validation / 255 | ||
|
||
k = 8 * 8 * 8 | ||
sqrtk = np.sqrt(k / 2) | ||
c = k // 64 | ||
snr = 0 | ||
p = 1 | ||
var = p / math.pow(10, snr / 10) | ||
var = var/2 #to make it complex | ||
std = np.sqrt(var) | ||
n = 32 * 32 * 3 | ||
np.random.seed(1000) | ||
width = 32 | ||
height = 32 | ||
batch_size = 64 | ||
nb_epochs = 15 | ||
code_length = 128 | ||
print(std, std ** 2, 'k/n: ', k / (2 * n)) | ||
|
||
|
||
|
||
K.clear_session() | ||
tf.set_random_seed(0) | ||
np.random.seed(0) | ||
|
||
class ChannelNormalizer(Layer): | ||
|
||
def __init__(self, sqrtk, **kwargs): | ||
# self.output_dim = output_dim | ||
self.sqrtk = sqrtk | ||
super(ChannelNormalizer, self).__init__(**kwargs) | ||
|
||
def build(self, input_shape): | ||
super(ChannelNormalizer, self).build(input_shape) # Be sure to call this at the end | ||
|
||
def call(self, x): | ||
return self.sqrtk * K.l2_normalize(x, axis=1) | ||
|
||
def compute_output_shape(self, input_shape): | ||
return input_shape | ||
|
||
|
||
class ChannelNoise(Layer): | ||
|
||
def __init__(self, sigma, **kwargs): | ||
# self.output_dim = output_dim | ||
self.sigma = sigma | ||
super(ChannelNoise, self).__init__(**kwargs) | ||
|
||
def build(self, input_shape): | ||
# Create a trainable weight variable for this layer. | ||
# self.kernel = self.add_weight(name='kernel', | ||
# shape=(input_shape[1], self.output_dim), | ||
# initializer='uniform', | ||
# trainable=True) | ||
self.inshape = input_shape | ||
super(ChannelNoise, self).build(input_shape) # Be sure to call this at the end | ||
|
||
def call(self, x): | ||
# l2_x = K.sqrt(K.dot(K.flatten(x),K.flatten(x))) | ||
# h = K.random_normal(shape = (1), mean = 0, stddev = self.Hc) | ||
|
||
return x + K.random_normal(self.inshape[1:], mean=0, stddev=self.sigma) | ||
|
||
def compute_output_shape(self, input_shape): | ||
return input_shape | ||
|
||
|
||
# Define model | ||
model = Sequential() | ||
|
||
# Encoder | ||
model.add(Conv2D(16, (5, 5), padding='same', strides=2, input_shape=X_train.shape[1:])) | ||
model.add(PReLU(alpha_initializer='zeros', alpha_regularizer=None, alpha_constraint=None, shared_axes=[1, 2])) | ||
# model.add(BatchNormalization()) | ||
model.add(Conv2D(32, (5, 5), padding='same', strides=2)) | ||
model.add(PReLU(alpha_initializer='zeros', alpha_regularizer=None, alpha_constraint=None, shared_axes=[1, 2])) | ||
# model.add(BatchNormalization()) | ||
model.add(Conv2D(32, (5, 5), padding='same', strides=1)) | ||
model.add(PReLU(alpha_initializer='zeros', alpha_regularizer=None, alpha_constraint=None, shared_axes=[1, 2])) | ||
# model.add(BatchNormalization()) | ||
model.add(Conv2D(32, (5, 5), padding='same', strides=1)) | ||
model.add(PReLU(alpha_initializer='zeros', alpha_regularizer=None, alpha_constraint=None, shared_axes=[1, 2])) | ||
# model.add(BatchNormalization()) | ||
model.add(Conv2D(c, (5, 5), padding='same', strides=1)) | ||
model.add(PReLU(alpha_initializer='zeros', alpha_regularizer=None, alpha_constraint=None, shared_axes=[1, 2])) | ||
# model.add(BatchNormalization(name='last')) | ||
model.add(Flatten(name='last')) | ||
model.add(ChannelNormalizer(sqrtk, name='normal')) | ||
model.add(ChannelNoise(std, name='noise')) | ||
model.add(Reshape([8, 8, c])) | ||
|
||
# Decoder | ||
model.add(Conv2DTranspose(32, (5, 5), padding='same', strides=1)) | ||
model.add(PReLU(alpha_initializer='zeros', alpha_regularizer=None, alpha_constraint=None, shared_axes=[1, 2])) | ||
# model.add(BatchNormalization()) | ||
model.add(Conv2DTranspose(32, (5, 5), padding='same', strides=1)) | ||
model.add(PReLU(alpha_initializer='zeros', alpha_regularizer=None, alpha_constraint=None, shared_axes=[1, 2])) | ||
# model.add(BatchNormalization()) | ||
model.add(Conv2DTranspose(32, (5, 5), padding='same', strides=1)) | ||
model.add(PReLU(alpha_initializer='zeros', alpha_regularizer=None, alpha_constraint=None, shared_axes=[1, 2])) | ||
# model.add(BatchNormalization()) | ||
model.add(Conv2DTranspose(16, (5, 5), padding='same', strides=2)) | ||
model.add(PReLU(alpha_initializer='zeros', alpha_regularizer=None, alpha_constraint=None, shared_axes=[1, 2])) | ||
# model.add(BatchNormalization()) | ||
model.add(Conv2DTranspose(3, (5, 5), padding='same', strides=2)) | ||
model.add(Activation('sigmoid')) | ||
|
||
|
||
opt = keras.optimizers.Adam(lr=0.001) | ||
|
||
|
||
def PSNR(y_true, y_pred): | ||
return 10 * K.log(K.max(y_true) ** 2 / (K.mean(K.square(y_pred - y_true)))) / K.log(10.0) | ||
|
||
|
||
def schedule(epoch, lr): | ||
if epoch<640: | ||
lr = 0.001 | ||
else: | ||
lr = 0.0001 | ||
return lr | ||
|
||
# from google.colab import files | ||
lrate = keras.callbacks.LearningRateScheduler(schedule, verbose=1) | ||
chckpnt = keras.callbacks.ModelCheckpoint(save_directory + 'ae_0db_1_12_weights.{epoch}-{val_PSNR:.2f}.h5', | ||
monitor='val_PSNR', verbose=0, save_best_only=False, | ||
save_weights_only=True, mode='auto', period=100) | ||
csv = keras.callbacks.CSVLogger(save_directory + 'ae_0db_1_12.log', separator=',', append=True) | ||
opt = keras.optimizers.Adam(lr=0.001) | ||
model.compile(loss='mse', optimizer=opt, metrics=[PSNR]) | ||
# TODO uncomment part below to load weights and continue learning | ||
# model.load_weights() | ||
model.fit(X_train_norm, X_train_norm, | ||
batch_size=64, | ||
epochs=5000, | ||
validation_data=(X_validation_norm, X_validation_norm), | ||
shuffle=True, | ||
callbacks=[lrate, csv, chckpnt]) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
import numpy as np | ||
import keras | ||
from keras.datasets import cifar10 | ||
from keras.models import Sequential | ||
from keras import backend as K | ||
from keras.layers import Layer, PReLU, Conv2D, Activation, Conv2DTranspose , GaussianNoise,BatchNormalization,Conv1D | ||
from sklearn.model_selection import train_test_split | ||
import tensorflow as tf | ||
import math | ||
import numpy as np | ||
from keras.models import Model | ||
from keras.layers import Layer, PReLU, Conv2D, Activation, Conv2DTranspose , GaussianNoise,Lambda, Flatten, Reshape,BatchNormalization,Reshape | ||
from sklearn.model_selection import train_test_split | ||
import tensorflow as tf | ||
import keras | ||
from keras.datasets import cifar10 | ||
from keras.models import Sequential | ||
from keras import backend as K | ||
import math | ||
|
||
from keras.layers import Input, Dense, Lambda | ||
from keras.layers import Conv2D, MaxPooling2D, Flatten | ||
#%% | ||
#make a 'saves' directory beside code to save callbacks and logs | ||
save_directory = 'save/' | ||
|
||
#%% | ||
|
||
# Load dataset | ||
(X_train, Y_train), (X_test, Y_test) = cifar10.load_data() | ||
# Divide data into test and validdation | ||
X_test, X_validation, Y_test,Y_validation = train_test_split(X_test, Y_test, test_size=0.33, random_state=42) | ||
|
||
# Normalizing dataset | ||
X_train_norm = X_train/255 | ||
X_test_norm = X_test/255 | ||
X_validation_norm = X_validation/255 | ||
|
||
#%% | ||
|
||
for snr in range(0, 20): | ||
k = 8 * 8 * 8 | ||
n = 32 * 32 * 3 | ||
# Make sure we devide k by two in the line below | ||
sqrtk = np.sqrt(k / 2) | ||
c = k // 64 | ||
p = 1 | ||
var = p / math.pow(10, snr / 10) | ||
var = var / 2 # var should be devided by 2 | ||
std = np.sqrt(var) | ||
np.random.seed(1000) | ||
width = 32 | ||
height = 32 | ||
batch_size = 64 | ||
nb_epochs = 15 | ||
code_length = 128 | ||
print(std, std ** 2, 'k/n: ', k / (2 * n)) | ||
# %% | ||
|
||
K.clear_session() | ||
tf.random.set_seed(0) | ||
np.random.seed(0) | ||
|
||
# from keras.mode import Model | ||
# encoder part | ||
input = Input(shape=(32, 32, 3)) | ||
conv_1 = Conv2D(16, (5, 5), padding='same', strides=2, activation='relu')(input) | ||
conv_2 = Conv2D(32, (5, 5), padding='same', strides=2, activation='relu')(conv_1) | ||
conv_3 = Conv2D(32, (5, 5), padding='same', strides=1, activation='relu')(conv_2) | ||
conv_4 = Conv2D(32, (5, 5), padding='same', strides=1, activation='relu')(conv_3) | ||
encoded = Conv2D(c, (5, 5), padding='same', strides=1, activation='relu')(conv_4) | ||
|
||
z_mean = Conv2D(c, (5, 5), padding='same', strides=1, activation='relu')(encoded) | ||
z_log_var = Conv2D(c, (5, 5), padding='same', strides=1, activation='relu')(encoded) | ||
|
||
|
||
# %% | ||
|
||
def sampling(args): | ||
z_mean, z_log_var = args | ||
epsilon = tf.random.normal( | ||
shape=(K.shape(z_mean)[0], K.shape(z_mean)[1], K.shape(z_mean)[2], K.shape(z_mean)[3]), mean=0., | ||
stddev=1.0) | ||
return z_mean + K.exp(z_log_var / 2) * epsilon | ||
|
||
|
||
from keras.layers import Input, Lambda, Reshape, Flatten | ||
|
||
z = Lambda(sampling, output_shape=(8, 8, c))([z_mean, z_log_var]) | ||
z = Flatten()(z) | ||
|
||
|
||
# %% | ||
|
||
class ChannelNormalizer(Layer): | ||
|
||
def __init__(self, sqrtk, **kwargs): | ||
self.sqrtk = sqrtk | ||
super(ChannelNormalizer, self).__init__(**kwargs) | ||
|
||
def build(self, input_shape): | ||
super(ChannelNormalizer, self).build(input_shape) # Be sure to call this at the end | ||
|
||
def call(self, x): | ||
return self.sqrtk * K.l2_normalize(x, axis=1) | ||
|
||
def compute_output_shape(self, input_shape): | ||
return input_shape | ||
|
||
|
||
z = ChannelNormalizer(sqrtk, name='normal')(z) | ||
|
||
|
||
# %% | ||
class ChannelNoise(Layer): | ||
|
||
def __init__(self, sigma, **kwargs): | ||
self.sigma = sigma | ||
super(ChannelNoise, self).__init__(**kwargs) | ||
|
||
def build(self, input_shape): | ||
self.inshape = input_shape | ||
super(ChannelNoise, self).build(input_shape) | ||
|
||
def call(self, x): | ||
return x + tf.random.normal(self.inshape[1:], mean=0, stddev=self.sigma) | ||
|
||
def compute_output_shape(self, input_shape): | ||
return input_shape | ||
|
||
|
||
z = ChannelNoise(std)(z) | ||
# %% | ||
|
||
z = Reshape([8, 8, c])(z) | ||
conv_0T = Conv2DTranspose(32, (5, 5), padding='same', strides=1, activation='relu')(z) | ||
conv_1T = Conv2DTranspose(32, (5, 5), padding='same', strides=1, activation='relu')(conv_0T) | ||
conv_2T = Conv2DTranspose(32, (5, 5), padding='same', strides=1, activation='relu')(conv_1T) | ||
conv_3T = Conv2DTranspose(16, (5, 5), padding='same', strides=2, activation='relu')(conv_2T) | ||
x_out = Conv2DTranspose(3, (5, 5), padding='same', strides=2, activation='sigmoid')(conv_3T) | ||
|
||
# %% | ||
|
||
vae = Model(input, x_out) | ||
|
||
|
||
|
||
def VAE_loss(x_origin, x_out): | ||
reconstruction_loss = tf.reduce_mean(tf.reduce_sum(tf.square(x_origin - x_out), axis=[1, 2, 3])) | ||
kl_loss = - 0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1) | ||
kl_loss = tf.reduce_mean(kl_loss) | ||
loss_sum = kl_loss + 32 * 32 * 3 * reconstruction_loss | ||
return loss_sum | ||
|
||
|
||
def PSNR(y_true, y_pred): | ||
return 10 * K.log(K.max(y_true) ** 2 / (K.mean(K.square(y_pred - y_true)))) / K.log(10.0) | ||
|
||
|
||
opt = keras.optimizers.Adam(lr=0.001) | ||
|
||
vae.compile(optimizer=opt, loss=VAE_loss, metrics=[PSNR]) | ||
|
||
vae.load_weights() | ||
|
||
vae.evaluate(X_test_norm, X_test_norm) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.