diff --git a/2_CNN/Conv.py b/2_CNN/Conv.py new file mode 100644 index 0000000000000000000000000000000000000000..668208c89fdaaaca3d49724b529d37f26a71a400 --- /dev/null +++ b/2_CNN/Conv.py @@ -0,0 +1,212 @@ +import copy + +from scipy import signal as sp +from .Base import BaseLayer +import numpy as np +import math + + +class Conv(BaseLayer): + kernels = None + weights = None + + def __init__(self, stride_shape, convolution_shape, num_kernels): + super().__init__() + self.trainable = True + self.conv = True + self.stride_shape = stride_shape + self.convolution_shape = convolution_shape + self.num_kernels = num_kernels + self.kernels_shape = None + self.conv1d = False + self.output_tensor = None + self.input_tensor = None + self.error_tensor = None + + if len(convolution_shape) == 2: + self.kernels_shape = (num_kernels, convolution_shape[0], convolution_shape[1]) + self.conv1d = True + + else: + self.kernels_shape = (num_kernels, convolution_shape[0], convolution_shape[1], convolution_shape[2]) + + self.weights = np.random.uniform(0, 1, self.kernels_shape) + + self.bias = np.random.uniform(0, 1, self.num_kernels) + + self._gradient_weights = None + self._gradient_bias = None + self._optimizer = None + self.optimizer_b = None + self.optimizer_w = None + self.input_tensor = None + + @property + def gradient_weights(self): + return self._gradient_weights + + @gradient_weights.setter + def gradient_weights(self, w): + self._gradient_weights = w + + # gradient_weights = property(get_gradient_weights, set_gradient_weights) + + @property + def gradient_bias(self): + return self._gradient_bias + + @gradient_bias.setter + def set_gradient_bias(self, b): + self._gradient_bias = b + + # gradient_bias = property(get_gradient_bias, set_gradient_bias) + + @property + def optimizer(self): + return self._optimizer + + @optimizer.setter + def optimizer(self, ow): + self._optimizer = ow + self.optimizer_b = copy.deepcopy(self._optimizer) + self.optimizer_w = copy.deepcopy(self._optimizer) + + # optimizer = property(get_optimizer, set_optimizer) + + def forward(self, input_tensor): + self.input_tensor = input_tensor + + if self.conv1d: + self.output_tensor = np.empty( + (input_tensor.shape[0], self.num_kernels, math.ceil(input_tensor.shape[2] / self.stride_shape[0]))) + else: + self.output_tensor = np.empty((input_tensor.shape[0], self.num_kernels, + math.ceil(input_tensor.shape[2] / self.stride_shape[0]), + math.ceil(input_tensor.shape[3] / self.stride_shape[1]))) + + for i in range(input_tensor.shape[0]): # through every element of the batch + curr_image = input_tensor[i] + for j in range(self.num_kernels): + c = input_tensor.shape[1] + tc = math.floor(c / 2) # middle channel + + output_image = sp.correlate(curr_image, self.weights[j], mode='same')[ + tc] # get valid padding across channels + + if len(self.stride_shape) == 2: + output_image_samp = output_image[0:output_image.shape[0]:self.stride_shape[0], + 0:output_image.shape[1]:self.stride_shape[1]] + else: + if self.conv1d: + output_image_samp = output_image[0:output_image.shape[0]:self.stride_shape[0]] + else: + output_image_samp = output_image[:, 0:output_image.shape[1]:self.stride_shape] + + self.output_tensor[i][j] = output_image_samp + self.bias[j] + + return self.output_tensor + + def backward(self, error_tensor): + # get gradient w.r.t weight + self.error_tensor = error_tensor + + if self.conv1d: + compensated_error_tensor = np.zeros( + (self.input_tensor.shape[0], self.num_kernels, self.input_tensor.shape[2])) + for k in range(error_tensor.shape[2]): + compensated_error_tensor[:, :, k * self.stride_shape[0]] = error_tensor[:, :, k] + else: + compensated_error_tensor = np.zeros( + (self.input_tensor.shape[0], self.num_kernels, self.input_tensor.shape[2], self.input_tensor.shape[3])) + for k in range(error_tensor.shape[2]): + for l in range(error_tensor.shape[3]): + compensated_error_tensor[:, :, k * self.stride_shape[0], l * self.stride_shape[1]] = error_tensor[:, + :, k, l] + + self.error_tensor = compensated_error_tensor + + # for i in range(error_tensor.shape[0]): + # for j in range(error_tensor.shape[1]): + # for k in range(error_tensor.shape[2]): + # for l in range(error_tensor.shape[3]): + # compensated_error_tensor[i][j][k * self.stride_shape[0]][l * self.stride_shape[1]] = error_tensor[i][j][k][l] + + kernel_rows = self.weights.shape[2] + input_tensor_copy = np.copy(self.input_tensor) + + if self.conv1d: + input_tensor_copy = np.pad(input_tensor_copy, + ((0, 0), (0, 0), + (int(kernel_rows // 2 - (1 - kernel_rows % 2)), int(kernel_rows // 2))), + mode='constant', + constant_values=0) + else: + kernel_cols = self.weights.shape[3] + input_tensor_copy = np.pad(input_tensor_copy, ( + (0, 0), (0, 0), (int(kernel_rows // 2 - (1 - kernel_rows % 2)), int(kernel_rows // 2)), + (int(kernel_cols // 2 - (1 - kernel_cols % 2)), int(kernel_cols / 2))), + mode='constant', constant_values=0) + + self._gradient_weights = np.empty(self.weights.shape) + + for i in range(input_tensor_copy.shape[1]): + for j in range(self.error_tensor.shape[1]): + self.gradient_weights[j][i] = sp.correlate(input_tensor_copy[:, i], self.error_tensor[:, j], + mode='valid') + + # get gradient wrt bias + self._gradient_bias = np.empty(self.num_kernels) + g1 = np.sum(self.error_tensor, axis=0) # sum w.r.t. batches + for i in range(len(self.error_tensor.shape) - 2): + g1 = np.sum(g1, axis=1) + + self._gradient_bias = g1 + #v1 = np.copy(self.weights) + + if not (self.optimizer_w== None): + self.weights = self.optimizer_w.calculate_update(self.weights, self._gradient_weights) + + # print(v1==v2,"weights comparison") + + if not (self.optimizer_b== None): + self.bias = self.optimizer_b.calculate_update(self.bias, self._gradient_bias) + + # print(g1==self.,"bias comparison") + + # error tensor for next layer + num_channels = self.error_tensor.shape[1] #fgdebug + if self.conv1d: + kernels_back = np.empty((self.convolution_shape[0], self.num_kernels, self.convolution_shape[1])) + else: + kernels_back = np.empty( + (self.convolution_shape[0], self.num_kernels, self.convolution_shape[1], self.convolution_shape[2])) + + for i in range(self.convolution_shape[0]): + for j in range(num_channels): + kernels_back[i][j] = self.weights[j][i] + + kernels_back = np.flip(kernels_back, 1) + + c = num_channels + tc = math.floor(c / 2) #fgdebug - the change here + + if self.conv1d: + op = np.empty((self.error_tensor.shape[0], self.convolution_shape[0], self.error_tensor.shape[2])) + else: + op = np.empty((self.error_tensor.shape[0], self.convolution_shape[0], self.error_tensor.shape[2], + self.error_tensor.shape[3])) + + for i in range(self.error_tensor.shape[0]): # through every element in Batch #fgdebug + for j in range(self.convolution_shape[0]): # through every channel + op[i, j] = sp.convolve(self.error_tensor[i], kernels_back[j], mode='same')[tc] + + return op + + def initialize(self, weights_initializer, bias_initializer): + fan_in = np.prod(np.array(self.convolution_shape)) + fan_out = np.prod(np.array(self.convolution_shape[1::])) * self.num_kernels + + self.weights = weights_initializer.initialize(self.weights.shape, fan_in, fan_out) + + bias_shape = self.num_kernels # removed num_kernels,1 + self.bias = bias_initializer.initialize(bias_shape, fan_in, fan_out)