| @@ -0,0 +1,8 @@ | |||
| # Default ignored files | |||
| /shelf/ | |||
| /workspace.xml | |||
| # Editor-based HTTP Client requests | |||
| /httpRequests/ | |||
| # Datasource local storage ignored files | |||
| /dataSources/ | |||
| /dataSources.local.xml | |||
| @@ -0,0 +1,10 @@ | |||
| <?xml version="1.0" encoding="UTF-8"?> | |||
| <module type="PYTHON_MODULE" version="4"> | |||
| <component name="NewModuleRootManager"> | |||
| <content url="file://$MODULE_DIR$"> | |||
| <excludeFolder url="file://$MODULE_DIR$/.venv" /> | |||
| </content> | |||
| <orderEntry type="inheritedJdk" /> | |||
| <orderEntry type="sourceFolder" forTests="false" /> | |||
| </component> | |||
| </module> | |||
| @@ -0,0 +1,30 @@ | |||
| <component name="InspectionProjectProfileManager"> | |||
| <profile version="1.0"> | |||
| <option name="myName" value="Project Default" /> | |||
| <inspection_tool class="PyMethodParametersInspection" enabled="false" level="WEAK WARNING" enabled_by_default="false" /> | |||
| <inspection_tool class="PyPep8Inspection" enabled="true" level="WEAK WARNING" enabled_by_default="true"> | |||
| <option name="ignoredErrors"> | |||
| <list> | |||
| <option value="E111" /> | |||
| <option value="E301" /> | |||
| <option value="E252" /> | |||
| <option value="E501" /> | |||
| <option value="E231" /> | |||
| <option value="E302" /> | |||
| <option value="E305" /> | |||
| <option value="E402" /> | |||
| <option value="E401" /> | |||
| <option value="E114" /> | |||
| <option value="E261" /> | |||
| </list> | |||
| </option> | |||
| </inspection_tool> | |||
| <inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true"> | |||
| <option name="ignoredIdentifiers"> | |||
| <list> | |||
| <option value="bool.*" /> | |||
| </list> | |||
| </option> | |||
| </inspection_tool> | |||
| </profile> | |||
| </component> | |||
| @@ -0,0 +1,6 @@ | |||
| <component name="InspectionProjectProfileManager"> | |||
| <settings> | |||
| <option name="USE_PROJECT_PROFILE" value="false" /> | |||
| <version value="1.0" /> | |||
| </settings> | |||
| </component> | |||
| @@ -0,0 +1,7 @@ | |||
| <?xml version="1.0" encoding="UTF-8"?> | |||
| <project version="4"> | |||
| <component name="Black"> | |||
| <option name="sdkName" value="Python 3.10 (image-recognizer)" /> | |||
| </component> | |||
| <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (image-recognizer)" project-jdk-type="Python SDK" /> | |||
| </project> | |||
| @@ -0,0 +1,8 @@ | |||
| <?xml version="1.0" encoding="UTF-8"?> | |||
| <project version="4"> | |||
| <component name="ProjectModuleManager"> | |||
| <modules> | |||
| <module fileurl="file://$PROJECT_DIR$/.idea/image-recognizer.iml" filepath="$PROJECT_DIR$/.idea/image-recognizer.iml" /> | |||
| </modules> | |||
| </component> | |||
| </project> | |||
| @@ -0,0 +1,185 @@ | |||
| from colorspacious import cspace_converter | |||
| import numpy as np | |||
| from matplotlib import pyplot as plt | |||
| import matplotlib as mpl | |||
| cmaps = {} | |||
| gradient = np.linspace(0, 1, 256) | |||
| gradient = np.vstack((gradient, gradient)) | |||
| def plot_color_gradients(category, cmap_list): | |||
| # Create figure and adjust figure height to number of colormaps | |||
| nrows = len(cmap_list) | |||
| figh = 0.35 + 0.15 + (nrows + (nrows - 1) * 0.1) * 0.22 | |||
| fig, axs = plt.subplots(nrows=nrows + 1, figsize=(6.4, figh)) | |||
| fig.subplots_adjust(top=1 - 0.35 / figh, bottom=0.15 / figh, | |||
| left=0.2, right=0.99) | |||
| axs[0].set_title(f'{category} colormaps', fontsize=14) | |||
| for ax, name in zip(axs, cmap_list): | |||
| ax.imshow(gradient, aspect='auto', cmap=mpl.colormaps[name]) | |||
| ax.text(-0.01, 0.5, name, va='center', ha='right', fontsize=10, | |||
| transform=ax.transAxes) | |||
| # Turn off *all* ticks & spines, not just the ones with colormaps. | |||
| for ax in axs: | |||
| ax.set_axis_off() | |||
| # Save colormap list for later. | |||
| cmaps[category] = cmap_list | |||
| plot_color_gradients('Perceptually Uniform Sequential', | |||
| ['viridis', 'plasma', 'inferno', 'magma', 'cividis']) | |||
| plot_color_gradients('Sequential', | |||
| ['Greys', 'Purples', 'Blues', 'Greens', 'Oranges', 'Reds', | |||
| 'YlOrBr', 'YlOrRd', 'OrRd', 'PuRd', 'RdPu', 'BuPu', | |||
| 'GnBu', 'PuBu', 'YlGnBu', 'PuBuGn', 'BuGn', 'YlGn']) | |||
| plot_color_gradients('Sequential', | |||
| ['Greys', 'Purples', 'Blues', 'Greens', 'Oranges', 'Reds', | |||
| 'YlOrBr', 'YlOrRd', 'OrRd', 'PuRd', 'RdPu', 'BuPu', | |||
| 'GnBu', 'PuBu', 'YlGnBu', 'PuBuGn', 'BuGn', 'YlGn']) | |||
| plot_color_gradients('Sequential (2)', | |||
| ['binary', 'gist_yarg', 'gist_gray', 'gray', 'bone', | |||
| 'pink', 'spring', 'summer', 'autumn', 'winter', 'cool', | |||
| 'Wistia', 'hot', 'afmhot', 'gist_heat', 'copper']) | |||
| plot_color_gradients('Diverging', | |||
| ['PiYG', 'PRGn', 'BrBG', 'PuOr', 'RdGy', 'RdBu', 'RdYlBu', | |||
| 'RdYlGn', 'Spectral', 'coolwarm', 'bwr', 'seismic']) | |||
| plot_color_gradients('Cyclic', ['twilight', 'twilight_shifted', 'hsv']) | |||
| plot_color_gradients('Qualitative', | |||
| ['Pastel1', 'Pastel2', 'Paired', 'Accent', 'Dark2', | |||
| 'Set1', 'Set2', 'Set3', 'tab10', 'tab20', 'tab20b', | |||
| 'tab20c']) | |||
| plot_color_gradients('Miscellaneous', | |||
| ['flag', 'prism', 'ocean', 'gist_earth', 'terrain', | |||
| 'gist_stern', 'gnuplot', 'gnuplot2', 'CMRmap', | |||
| 'cubehelix', 'brg', 'gist_rainbow', 'rainbow', 'jet', | |||
| 'turbo', 'nipy_spectral', 'gist_ncar']) | |||
| plt.show() | |||
| mpl.rcParams.update({'font.size': 12}) | |||
| # Number of colormap per subplot for particular cmap categories | |||
| _DSUBS = {'Perceptually Uniform Sequential': 5, 'Sequential': 6, | |||
| 'Sequential (2)': 6, 'Diverging': 6, 'Cyclic': 3, | |||
| 'Qualitative': 4, 'Miscellaneous': 6} | |||
| # Spacing between the colormaps of a subplot | |||
| _DC = {'Perceptually Uniform Sequential': 1.4, 'Sequential': 0.7, | |||
| 'Sequential (2)': 1.4, 'Diverging': 1.4, 'Cyclic': 1.4, | |||
| 'Qualitative': 1.4, 'Miscellaneous': 1.4} | |||
| # Indices to step through colormap | |||
| x = np.linspace(0.0, 1.0, 100) | |||
| # Do plot | |||
| for cmap_category, cmap_list in cmaps.items(): | |||
| # Do subplots so that colormaps have enough space. | |||
| # Default is 6 colormaps per subplot. | |||
| dsub = _DSUBS.get(cmap_category, 6) | |||
| nsubplots = int(np.ceil(len(cmap_list) / dsub)) | |||
| # squeeze=False to handle similarly the case of a single subplot | |||
| fig, axs = plt.subplots(nrows=nsubplots, squeeze=False, | |||
| figsize=(7, 2.6*nsubplots)) | |||
| for i, ax in enumerate(axs.flat): | |||
| locs = [] # locations for text labels | |||
| for j, cmap in enumerate(cmap_list[i*dsub:(i+1)*dsub]): | |||
| # Get RGB values for colormap and convert the colormap in | |||
| # CAM02-UCS colorspace. lab[0, :, 0] is the lightness. | |||
| rgb = mpl.colormaps[cmap](x)[np.newaxis, :, :3] | |||
| lab = cspace_converter("sRGB1", "CAM02-UCS")(rgb) | |||
| # Plot colormap L values. Do separately for each category | |||
| # so each plot can be pretty. To make scatter markers change | |||
| # color along plot: | |||
| # https://stackoverflow.com/q/8202605/ | |||
| if cmap_category == 'Sequential': | |||
| # These colormaps all start at high lightness, but we want them | |||
| # reversed to look nice in the plot, so reverse the order. | |||
| y_ = lab[0, ::-1, 0] | |||
| c_ = x[::-1] | |||
| else: | |||
| y_ = lab[0, :, 0] | |||
| c_ = x | |||
| dc = _DC.get(cmap_category, 1.4) # cmaps horizontal spacing | |||
| ax.scatter(x + j*dc, y_, c=c_, cmap=cmap, s=300, linewidths=0.0) | |||
| # Store locations for colormap labels | |||
| if cmap_category in ('Perceptually Uniform Sequential', | |||
| 'Sequential'): | |||
| locs.append(x[-1] + j*dc) | |||
| elif cmap_category in ('Diverging', 'Qualitative', 'Cyclic', | |||
| 'Miscellaneous', 'Sequential (2)'): | |||
| locs.append(x[int(x.size/2.)] + j*dc) | |||
| # Set up the axis limits: | |||
| # * the 1st subplot is used as a reference for the x-axis limits | |||
| # * lightness values goes from 0 to 100 (y-axis limits) | |||
| ax.set_xlim(axs[0, 0].get_xlim()) | |||
| ax.set_ylim(0.0, 100.0) | |||
| # Set up labels for colormaps | |||
| ax.xaxis.set_ticks_position('top') | |||
| ticker = mpl.ticker.FixedLocator(locs) | |||
| ax.xaxis.set_major_locator(ticker) | |||
| formatter = mpl.ticker.FixedFormatter(cmap_list[i*dsub:(i+1)*dsub]) | |||
| ax.xaxis.set_major_formatter(formatter) | |||
| ax.xaxis.set_tick_params(rotation=50) | |||
| ax.set_ylabel('Lightness $L^*$', fontsize=12) | |||
| ax.set_xlabel(cmap_category + ' colormaps', fontsize=14) | |||
| fig.tight_layout(h_pad=0.0, pad=1.5) | |||
| plt.show() | |||
| mpl.rcParams.update({'font.size': 14}) | |||
| # Indices to step through colormap. | |||
| x = np.linspace(0.0, 1.0, 100) | |||
| gradient = np.linspace(0, 1, 256) | |||
| gradient = np.vstack((gradient, gradient)) | |||
| def plot_color_gradients(cmap_category, cmap_list): | |||
| fig, axs = plt.subplots(nrows=len(cmap_list), ncols=2) | |||
| fig.subplots_adjust(top=0.95, bottom=0.01, left=0.2, right=0.99, | |||
| wspace=0.05) | |||
| fig.suptitle(cmap_category + ' colormaps', fontsize=14, y=1.0, x=0.6) | |||
| for ax, name in zip(axs, cmap_list): | |||
| # Get RGB values for colormap. | |||
| rgb = mpl.colormaps[name](x)[np.newaxis, :, :3] | |||
| # Get colormap in CAM02-UCS colorspace. We want the lightness. | |||
| lab = cspace_converter("sRGB1", "CAM02-UCS")(rgb) | |||
| L = lab[0, :, 0] | |||
| L = np.float32(np.vstack((L, L, L))) | |||
| ax[0].imshow(gradient, aspect='auto', cmap=mpl.colormaps[name]) | |||
| ax[1].imshow(L, aspect='auto', cmap='binary_r', vmin=0., vmax=100.) | |||
| pos = list(ax[0].get_position().bounds) | |||
| x_text = pos[0] - 0.01 | |||
| y_text = pos[1] + pos[3]/2. | |||
| fig.text(x_text, y_text, name, va='center', ha='right', fontsize=10) | |||
| # Turn off *all* ticks & spines, not just the ones with colormaps. | |||
| for ax in axs.flat: | |||
| ax.set_axis_off() | |||
| plt.show() | |||
| for cmap_category, cmap_list in cmaps.items(): | |||
| plot_color_gradients(cmap_category, cmap_list) | |||
| @@ -0,0 +1,31 @@ | |||
| import struct | |||
| import numpy as np | |||
| from neural_net.neural_net import ModelData | |||
| class MNISTModelData(ModelData): | |||
| def __init__(self, fn_train_inputs, fn_train_targets, fn_test_inputs, fn_test_targets): | |||
| super().__init__( | |||
| self._get_images_from_idx(fn_train_inputs), | |||
| self._get_labels_from_idx(fn_train_targets), | |||
| self._get_images_from_idx(fn_test_inputs), | |||
| self._get_labels_from_idx(fn_test_targets) | |||
| ) | |||
| print(np.array(self.test_inputs[0]).reshape((28, 28))) | |||
| def _get_images_from_idx(self, file): | |||
| with open(file, 'rb') as f: | |||
| magic, size = struct.unpack(">II", f.read(8)) | |||
| nrows, ncols = struct.unpack(">II", f.read(8)) | |||
| data = np.fromfile(f, dtype=np.dtype(np.uint8).newbyteorder('>')) | |||
| data = data.reshape((size, nrows * ncols)) / 255 | |||
| return 1 - data | |||
| def _get_labels_from_idx(self, file): | |||
| with open(file, 'rb') as f: | |||
| magic, size = struct.unpack(">II", f.read(8)) | |||
| data = np.fromfile(f, dtype=np.dtype(np.uint8).newbyteorder('>')) | |||
| return data | |||
| @@ -0,0 +1,67 @@ | |||
| from ui.app import App | |||
| if __name__ == '__main__': | |||
| app = App() | |||
| app.mainloop() | |||
| # import numpy as np | |||
| # from matplotlib import pyplot as plt | |||
| # | |||
| # import matplotlib | |||
| # | |||
| # matplotlib.use("TkAgg") | |||
| # np.random.seed(0) | |||
| # | |||
| # from utils.mnist import MNISTNeuralNet | |||
| # | |||
| # # Set the precision to 3 decimal places | |||
| # np.set_printoptions(precision=8, suppress=True) | |||
| # | |||
| # from utils.load_mnist import get_test_images, get_test_labels, get_train_images, get_train_labels | |||
| # | |||
| # train_images = get_train_images() | |||
| # train_labels = get_train_labels() | |||
| # | |||
| # mnist_neural_net = MNISTNeuralNet() | |||
| # losses = mnist_neural_net.train(train_images, train_labels, 0.0001, 100) | |||
| # test_images = get_test_images() | |||
| # test_labels = get_test_labels() | |||
| # results = mnist_neural_net.forward(test_images) | |||
| # predictions = results.argmax(axis=1) | |||
| # | |||
| # correct = predictions == test_labels | |||
| # incorrect = predictions != test_labels | |||
| # accuracy = mnist_neural_net.accuracy(results, test_labels) | |||
| # # Create figure and axes | |||
| # fig, ax = plt.subplots(figsize=(10, 5)) | |||
| # | |||
| # ax.hist(test_labels[correct], bins=np.arange(11)-0.5, alpha=0.5, label="Correct", color="green") | |||
| # ax.hist(test_labels[incorrect], bins=np.arange(11)-0.5, alpha=0.5, label="Incorrect", color="red") | |||
| # ax.set_xticks(range(10)) | |||
| # ax.set_xlabel("True Label") | |||
| # ax.set_ylabel("Count") | |||
| # ax.set_title(f"Accuracy {accuracy}") | |||
| # ax.legend() | |||
| # | |||
| # fig.show() | |||
| # while True: | |||
| # plt.pause(0.1) | |||
| #################### | |||
| ## Draw image ## | |||
| #################### | |||
| # Create a figure and axes | |||
| # fig, ax = plt.subplots() | |||
| # Initial matrix displayed | |||
| # initial_data = np.array(images[0]) | |||
| # mat = ax.matshow(initial_data.reshape(28, 28), cmap='bwr') | |||
| # fig.show() | |||
| # Redraw the canvas | |||
| # fig.canvas.draw() | |||
| # fig.canvas.flush_events() | |||
| # | |||
| # plt.pause(20) | |||
| @@ -0,0 +1,95 @@ | |||
| from abc import abstractmethod | |||
| import numpy as np | |||
| from neural_net.transform_layer import Layer | |||
| class ActivationLayer(Layer): | |||
| def __init__(self, index, input_dim, output_dim, weights=None, biases=None): | |||
| super().__init__('ActivationLayer', index, input_dim, output_dim) | |||
| self.type = 'ActivationLayer' | |||
| self.subtype = '' | |||
| self.inputs = np.array([]) | |||
| self.output = np.array([]) | |||
| self.z = np.array([]) | |||
| self.gradient_clip = 1.0 | |||
| # Initialize weights and biases | |||
| if weights is not None: | |||
| self.weights = weights | |||
| else: | |||
| self.initialize_weights() | |||
| if biases is not None: | |||
| self.biases = biases | |||
| else: | |||
| self.initialize_biases() | |||
| def describe(self): | |||
| return f"{self.type} ({self.input_dim}x{self.output_dim} neurons, {self.subtype} activation)" | |||
| @abstractmethod | |||
| def initialize_weights(self): | |||
| pass | |||
| @abstractmethod | |||
| def initialize_biases(self): | |||
| pass | |||
| def forward(self, inputs: np.array): | |||
| self.inputs = inputs | |||
| self.z = np.dot(self.inputs, self.weights) + self.biases | |||
| self.output = self.activation(self.z) # Calls the implemented class's activation function (ie. Sigmoid) | |||
| return self.output | |||
| def backward(self, dL_dout, learning_rate): | |||
| """ | |||
| Backpropagate the error and update weights and biases. | |||
| :param dL_dout: Gradient of loss with respect to layer outputs | |||
| :param learning_rate: Learning rate for weight updates | |||
| :return: Gradient with respect to inputs for previous layer (dL/dinputs) | |||
| """ | |||
| # Activation derivative dout/dz | |||
| # This tells you how much the output of the activation function changes with respect to the pre-activation value z. | |||
| # Sigmoid derivative formula: σ(z) * (1 - σ(z)) | |||
| dout_dz = self.activation_derivative(self.output) | |||
| # Gradient of the loss with respect to weights (dL/dweights) | |||
| # This represents how much the loss changes when the weights change. | |||
| # Formula: dL/dweights = inputs × dL/dout × σ′(z) | |||
| dL_dweights = np.clip(np.dot(self.inputs.T, dL_dout * dout_dz), -self.gradient_clip, self.gradient_clip) | |||
| dL_dbias = np.sum(dL_dout * dout_dz, axis=0) | |||
| # Gradient of the loss with respect to inputs (dL/dinputs) | |||
| # This is the gradient of the loss with respect to the input of the neuron or layer, often needed if you want to backpropagate further. | |||
| # Formula: dL / dinputs = dL/dout × σ′(z) × weights | |||
| dL_dinputs = np.dot(dL_dout * dout_dz, self.weights.T) | |||
| # Clip gradients to prevent them from being too large | |||
| # np.clip(dL_dweights, -10.0, 10.0, out=dL_dweights) | |||
| # np.clip(dL_dbias, -10.0, 10.0, out=dL_dbias) | |||
| # Adjust weights and biases | |||
| self.weights -= learning_rate * dL_dweights | |||
| self.biases -= learning_rate * dL_dbias | |||
| return dL_dinputs, dL_dweights, dL_dbias, self.weights, self.biases | |||
| def reset(self): | |||
| self.initialize_weights() | |||
| self.initialize_biases() | |||
| @abstractmethod | |||
| def activation(self, raw_outputs: np.array): | |||
| """ | |||
| Apply the activation function (Sigmoid, ReLU, etc.) | |||
| """ | |||
| pass | |||
| @abstractmethod | |||
| def activation_derivative(self, outputs: np.array): | |||
| """ | |||
| Compute the derivative of the activation function | |||
| """ | |||
| pass | |||
| @@ -0,0 +1,23 @@ | |||
| import numpy as np | |||
| from neural_net.activation_layers.activation_layer import ActivationLayer | |||
| from neural_net.functions.activation import relu_activation, relu_derivative_activation | |||
| class ReluLayer(ActivationLayer): | |||
| def __init__(self, index, input_dim, output_dim, weights=None, biases=None): | |||
| super().__init__(index, input_dim, output_dim, weights, biases) | |||
| self.subtype = 'RELU' | |||
| def initialize_weights(self): | |||
| # He initialization (input_dim x output_dim) | |||
| self.weights = np.random.randn(self.input_dim, self.output_dim) * np.sqrt(2.0 / self.input_dim) | |||
| def initialize_biases(self): | |||
| self.biases = np.zeros((1, self.output_dim)) # Biases initialized to zero | |||
| def activation(self, outputs: np.array): | |||
| return relu_activation(outputs) | |||
| def activation_derivative(self, outputs: np.array): | |||
| return relu_derivative_activation(outputs) | |||
| @@ -0,0 +1,24 @@ | |||
| import numpy as np | |||
| from neural_net.activation_layers.activation_layer import ActivationLayer | |||
| from neural_net.functions.activation import sigmoid_derivative_activation | |||
| class SigmoidLayer(ActivationLayer): | |||
| def __init__(self, input_dim, output_dim, weights=None, biases=None): | |||
| super().__init__(input_dim, output_dim, weights, biases) | |||
| self.subtype = 'Sigmoid' | |||
| def initialize_weights(self): | |||
| # Xavier initialization for sigmoid activation | |||
| limit = np.sqrt(6 / (self.input_dim + self.output_dim)) | |||
| self.weights = np.random.uniform(-limit, limit, (self.input_dim, self.output_dim)) | |||
| def initialize_biases(self): | |||
| self.biases = np.zeros((1, self.output_dim)) # Biases initialized to zero | |||
| def activation(self, outputs: np.array): | |||
| return sigmoid_derivative_activation(outputs) | |||
| def activation_derivative(self, outputs: np.array): | |||
| return sigmoid_derivative_activation(outputs) | |||
| @@ -0,0 +1,47 @@ | |||
| import time | |||
| import numpy as np | |||
| class Epoch: | |||
| def __init__(self, epoch, inputs, labels, learning_rate, batch_size): | |||
| self.epoch = epoch | |||
| self.loss = -1.0 | |||
| self.duration = 0 | |||
| self.learning_rate = learning_rate | |||
| self.batch_size = batch_size | |||
| self.batches = [] | |||
| for i in range(0, len(inputs), self.batch_size): | |||
| self.batches.append(TrainingBatch(i, inputs[i:i + batch_size], labels[i:i + batch_size])) | |||
| self.layer_dl_gradients = [] | |||
| self.layer_dl_biases = [] | |||
| self.layer_weights = [] | |||
| self.finished = False | |||
| def start(self): | |||
| self.start_time = time.time() | |||
| def finish(self, neural_net): | |||
| self.finished = True | |||
| self.trained_weights = neural_net.get_all_weights() | |||
| self.end_time = time.time() | |||
| self.duration = self.end_time - self.start_time | |||
| def all_predictions(self): | |||
| return np.concatenate(np.array([batch.predictions for batch in self.batches])) | |||
| def all_labels(self): | |||
| return np.concatenate(np.array([batch.labels for batch in self.batches])) | |||
| def all_inputs(self): | |||
| return np.concatenate(np.array([batch.inputs for batch in self.batches])) | |||
| def print_epoch(self): | |||
| print(f"Epoch {self.epoch}:") | |||
| print(f"Loss: {self.loss}") | |||
| print(f"dL / Gradients: {self.layer_dl_gradients}") | |||
| print(f"dL / Bias: {self.layer_dl_gradients}") | |||
| class TrainingBatch: | |||
| def __init__(self, batch_num, inputs, labels): | |||
| self.batch_num = batch_num | |||
| self.inputs = inputs | |||
| self.labels = labels | |||
| self.predictions = [] | |||
| @@ -0,0 +1,13 @@ | |||
| import numpy as np | |||
| def relu_activation(outputs): | |||
| return np.maximum(0, outputs) | |||
| def relu_derivative_activation(outputs): | |||
| return np.where(outputs > 0, 1, 0) | |||
| def sigmoid_activation(outputs): | |||
| return 1 / (1 + np.exp(-outputs)) | |||
| def sigmoid_derivative_activation(outputs): | |||
| return outputs * (1 - outputs) | |||
| @@ -0,0 +1,27 @@ | |||
| import numpy as np | |||
| def cross_entropy_loss(outputs, targets, clip=True): | |||
| """ | |||
| outputs: [ | |||
| [ 0.32, 0.12, 0.04 ], | |||
| [ 0.62, 0.02, 0.14 ] | |||
| ] | |||
| targets: [ 2, 1 ] | |||
| :param outputs: np.array: Vector of all the predicted probabilities vectors | |||
| :param targets: np.array: Vector of one-hot vectors representing the actual values | |||
| :param clip: boolean, whether to clip the output probabilities | |||
| :return: | |||
| """ | |||
| if clip: | |||
| # Clipping the predictions for numerical stability | |||
| outputs = np.clip(outputs, 1e-12, 1 - 1e-12) | |||
| # Calculate cross-entropy loss and average over batch size | |||
| m = targets.shape[0] | |||
| log_likelihood = -np.log(outputs[range(m), targets]) | |||
| return np.sum(log_likelihood) / m # Average loss | |||
| def cross_entropy_derivative_loss(outputs, targets): | |||
| # One-hot encode the labels | |||
| y_true = np.eye(outputs.shape[1])[targets] | |||
| # Derivative of cross-entropy with respect to softmax inputs | |||
| return outputs - y_true | |||
| @@ -0,0 +1,34 @@ | |||
| import numpy as np | |||
| from neural_net.activation_layers.relu_layer import ReluLayer | |||
| from neural_net.functions.loss import cross_entropy_loss, cross_entropy_derivative_loss | |||
| from neural_net.neural_net import NeuralNet | |||
| from neural_net.transform_layer import SoftMaxLayer | |||
| class MNISTNeuralNet(NeuralNet): | |||
| def __init__(self): | |||
| super().__init__(layers=[ | |||
| ReluLayer(0, 784, 121), | |||
| ReluLayer(1, 121, 10), | |||
| SoftMaxLayer(2, 10) | |||
| ]) | |||
| def backward(self, dL_dout, epoch): | |||
| return super().backward(dL_dout, epoch) | |||
| def loss(self, y_pred: np.array, y_actual: np.array): | |||
| return cross_entropy_loss(y_pred, y_actual) | |||
| def loss_derivative(self, y_pred: np.array, targets: np.array): | |||
| return cross_entropy_derivative_loss(y_pred, targets) | |||
| def describe(self): | |||
| """Return a human-readable string of the model architecture.""" | |||
| architecture_info = "" | |||
| for layer in self.layers: | |||
| architecture_info += f"{layer.describe()}\n" | |||
| return architecture_info.strip() | |||
| def predict(self, inputs): | |||
| raw_outputs = super().predict(inputs) | |||
| return raw_outputs, raw_outputs.argmax(axis=1) | |||
| @@ -0,0 +1,127 @@ | |||
| from abc import abstractmethod | |||
| from enum import Enum | |||
| import numpy as np | |||
| from neural_net.epoch import Epoch | |||
| from neural_net.transform_layer import Layer | |||
| class ModelData: | |||
| def __init__(self, training_inputs, training_targets, test_inputs, test_targets): | |||
| self.is_loaded = False | |||
| self.training_inputs = training_inputs | |||
| self.training_labels = training_targets | |||
| self.test_inputs = test_inputs | |||
| self.test_labels = test_targets | |||
| # class TrainingSession: | |||
| # def __init__(self, training_data: ModelData, learning_rate: float, nr_epochs: int, batch_size: int = 1000): | |||
| # self.training_data = training_data | |||
| # self.learning_rate = learning_rate | |||
| # self.nr_epochs = nr_epochs | |||
| # self.batch_size = batch_size | |||
| # self.epochs: [Epoch] = [] | |||
| # for i in range(self.nr_epochs): | |||
| # self.epochs.append( | |||
| # Epoch(i, self.training_data.training_inputs, self.training_data.training_labels, self.batch_size)) | |||
| # | |||
| # def get_total_training_duration(self): | |||
| # duration = 0.0 | |||
| # for epoch in self.epochs: | |||
| # duration += epoch.duration | |||
| # return duration | |||
| class NeuralNet: | |||
| def __init__(self, layers: [Layer]): | |||
| self.layers = layers | |||
| self.last_loss = None | |||
| self.last_accuracy = None | |||
| def forward(self, inputs): | |||
| outputs = inputs | |||
| for layer in self.layers: | |||
| outputs = layer.forward(outputs) | |||
| return outputs | |||
| def reset(self): | |||
| for layer in self.layers: | |||
| layer.reset() | |||
| def backward(self, dL_dout, epoch): | |||
| layer_dl_gradients = [] | |||
| layer_dl_bias = [] | |||
| layer_weights = [] | |||
| layer_biases = [] | |||
| for idx, layer in reversed(list(enumerate(self.layers))): | |||
| dL_dout, dl_gradients, dl_biases, weights, biases = layer.backward(dL_dout, epoch.learning_rate) | |||
| if dl_gradients is not None: | |||
| layer_dl_gradients.append(dl_gradients) | |||
| if dl_biases is not None: | |||
| layer_dl_bias.append(dl_biases) | |||
| if weights is not None: | |||
| layer_weights.append(weights) | |||
| if biases is not None: | |||
| layer_biases.append(biases) | |||
| return layer_dl_gradients, layer_dl_bias, layer_weights, layer_biases | |||
| # def train(self, training_run: TrainingRun): | |||
| # self.training_runs.append(training_run) | |||
| # | |||
| # for epoch in training_run.epochs: | |||
| # epoch.start() | |||
| # | |||
| # for batch in epoch.batches: | |||
| # batch.predictions = self.forward(batch.inputs) | |||
| # dL_dout = self.loss_derivative(batch.predictions, batch.labels) | |||
| # | |||
| # layer_dl_gradients, layer_dl_biases, layer_weights, layer_biases = self.backward(dL_dout, training_run.learning_rate, epoch) | |||
| # epoch.layer_dl_gradients.append(layer_dl_gradients) | |||
| # epoch.layer_dl_biases.append(layer_dl_biases) | |||
| # | |||
| # epoch.finish() | |||
| # epoch.loss = self.loss(epoch.all_predictions(), epoch.all_labels()) | |||
| # | |||
| # if training_run.epoch_callback is not None: | |||
| # training_run.epoch_callback(training_run, epoch) | |||
| # | |||
| # self.recalculate_loss(training_run.training_data.test_inputs, training_run.training_data.test_labels) | |||
| # self.recalculate_loss(training_run.training_data.test_inputs, training_run.training_data.test_labels) | |||
| def get_all_weights(self): | |||
| all_weights = [] | |||
| for layer in self.layers: | |||
| if hasattr(layer, 'weights'): | |||
| all_weights.append(layer.weights) | |||
| return all_weights | |||
| def recalculate_accuracy(self, inputs, labels): | |||
| raw_outputs = self.forward(inputs) | |||
| predictions = raw_outputs.argmax(axis=1) | |||
| num_correct_predictions = 0 | |||
| for idx, prediction in enumerate(predictions): | |||
| if prediction == labels[idx]: | |||
| num_correct_predictions += 1 | |||
| self.last_accuracy = num_correct_predictions / len(predictions) | |||
| return self.last_accuracy | |||
| def recalculate_loss(self, inputs, labels): | |||
| raw_outputs = self.forward(inputs) | |||
| self.last_loss = self.loss(np.array(raw_outputs), np.array(labels)) | |||
| return self.last_loss | |||
| @abstractmethod | |||
| def loss(self, outputs: np.array, labels: np.array): | |||
| pass | |||
| @abstractmethod | |||
| def loss_derivative(self, outputs: np.array, labels: np.array): | |||
| pass | |||
| def predict(self, inputs): | |||
| return self.forward(inputs) | |||
| @@ -0,0 +1,65 @@ | |||
| from neural_net.epoch import Epoch | |||
| from neural_net.neural_net import NeuralNet, ModelData | |||
| class NeuralNetTrainer: | |||
| def __init__(self, neural_net: NeuralNet, model_data: ModelData, learning_rate: float, batch_size: int): | |||
| self.neural_net = neural_net | |||
| self.model_data = model_data | |||
| self.is_running = False | |||
| self.epoch_history = [] | |||
| self.learning_rate = learning_rate | |||
| self.batch_size = batch_size | |||
| def set_learning_rate(self, learning_rate: float): | |||
| self.learning_rate = learning_rate | |||
| def set_batch_size(self, batch_size: int): | |||
| self.batch_size = batch_size | |||
| def run_epoch(self): | |||
| epoch = Epoch(len(self.epoch_history), | |||
| self.model_data.training_inputs, | |||
| self.model_data.training_labels, | |||
| self.learning_rate, | |||
| self.batch_size | |||
| ) | |||
| self._train_one_epoch(epoch) | |||
| return epoch | |||
| def start(self, on_epoch_finish=None, on_finish=None): | |||
| self.is_running = True | |||
| while True: | |||
| # Stop function was called causing the trainer to reset | |||
| if not self.is_running: | |||
| break | |||
| # Perform one epoch of training | |||
| # In the future, we will apply a learning-rate algorithm | |||
| epoch = self.run_epoch() | |||
| if on_epoch_finish is not None: | |||
| on_epoch_finish(epoch) | |||
| if on_finish is not None: | |||
| on_finish() | |||
| self.stop() | |||
| def stop(self): | |||
| if self.is_running: | |||
| self.is_running = False | |||
| def _train_one_epoch(self, epoch: Epoch): | |||
| epoch.start() | |||
| for batch in epoch.batches: | |||
| batch.predictions = self.neural_net.forward(batch.inputs) | |||
| dL_dout = self.neural_net.loss_derivative(batch.predictions, batch.labels) | |||
| layer_dl_gradients, layer_dl_biases, layer_weights, layer_biases = self.neural_net.backward(dL_dout, epoch) | |||
| epoch.layer_dl_gradients.append(layer_dl_gradients) | |||
| epoch.layer_dl_biases.append(layer_dl_biases) | |||
| epoch.finish(self.neural_net) | |||
| epoch.loss = self.neural_net.loss(epoch.all_predictions(), epoch.all_labels()) | |||
| self.epoch_history.append(epoch) | |||
| @@ -0,0 +1,73 @@ | |||
| from abc import abstractmethod | |||
| import numpy as np | |||
| class Layer: | |||
| def __init__(self, type, index, input_dim, output_dim): | |||
| self.type = type | |||
| self.index = index | |||
| self.input_dim = input_dim | |||
| self.output_dim = output_dim | |||
| @abstractmethod | |||
| def forward(self, inputs): | |||
| raise NotImplementedError("This should be overridden by subclasses") | |||
| @abstractmethod | |||
| def backward(self, dL_dout, learning_rate): | |||
| raise NotImplementedError("This should be overridden by subclasses") | |||
| @abstractmethod | |||
| def reset(self): | |||
| raise NotImplementedError("This should be overridden by subclasses") | |||
| class TransformLayer(Layer): | |||
| def __init__(self, index, size): | |||
| super().__init__('TransformLayer', index, size, size) | |||
| def describe(self): | |||
| return self.type | |||
| def forward(self, inputs): | |||
| raise NotImplementedError("This should be overridden by subclasses") | |||
| def backward(self, dL_dout, learning_rate): | |||
| return dL_dout, None, None, None, None # This is the gradient to propagate to the previous layer | |||
| def reset(self): | |||
| pass | |||
| class NormalizeLayer(TransformLayer): | |||
| def __init__(self, index, size): | |||
| super().__init__(index, size) | |||
| self.type = 'NormalizeLayer' | |||
| def forward(self, inputs): | |||
| """ | |||
| Normalizes the input vector. | |||
| [1, 5, 5, 3, 6] => [0.05, 0.25, 0.25, 0.15, 0.3] | |||
| :param inputs: np.array(float) | |||
| :return: np.array(float) | |||
| """ | |||
| return inputs / inputs.sum() | |||
| class SoftMaxLayer(TransformLayer): | |||
| def __init__(self, index, size): | |||
| super().__init__(index, size) | |||
| self.type = 'SoftMaxLayer' | |||
| def forward(self, inputs): | |||
| """ | |||
| Normalizes the input vector, but "pushes" higher values to dominate the | |||
| probability distribution | |||
| [1, 5, 5, 3, 6] => [0.02, 0.26, 0.26, 0.10, 0.36] | |||
| :param inputs: np.array(float) | |||
| :return: np.array(float) | |||
| """ | |||
| input_ex = np.exp(inputs - inputs.max()) # Subtract max for numerical stability | |||
| s = np.sum(input_ex, axis=-1, keepdims=True) | |||
| # To prevent division by zero, ensure that the sum is not zero | |||
| if np.any(s == 0): | |||
| return np.ones_like(input_ex) / input_ex.shape[-1] # Return a uniform distribution if sum is 0 | |||
| return input_ex / s | |||
| @@ -0,0 +1,59 @@ | |||
| import numpy as np | |||
| # Your softmax outputs | |||
| outputs = np.array([ | |||
| [ | |||
| [ | |||
| 0.90924643, 0.0, 0.26800049, 0.0, 0.14153697, 0.07644807, | |||
| 0.0, 0.63928418, 0.14899383, 0.29679539, 0.29560591, 0.46324955, | |||
| 0.38955634, 0.0, 0.05094845, 0.0, 0.0, 0.26734416, 0.0, | |||
| 0.28399383, 0.0429699, 0.68988006, 0.0, 0.0, 0.0, 0.02901288, | |||
| 0.0, 0.01076904, 0.0, 0.41230365, 0.58630857, 0.0, 0.29906131, | |||
| 0.0, 0.00339327, 0.47909497, 0.07787446, 0.0, 0.0, 0.0, 0.0, | |||
| 0.0, 0.0, 0.59843748, 0.18691183, 0.0, 0.0, 0.0, 0.84100045, | |||
| 0.24468988, 0.0144432, 0.0, 0.27832373, 0.0, 0.45574082, | |||
| 0.16037272, 0.0, 0.28562163, 0.0, 0.0, 0.44667622, 0.0, 0.0, | |||
| 0.29725156, 0.0, 0.01500714, 0.51253602, 0.18559459, 0.07919077, | |||
| 0.0, 0.15155614, 0.0, 0.16996095, 0.26832836, 0.0, 0.56057083, | |||
| 0.47535547, 0.0, 0.08280879, 0.0, 0.07266015, 0.43079376, | |||
| 0.55633086, 0.0, 0.13123258, 0.33282808, 0.0, 0.73207594, 0.0, | |||
| 0.08246748, 0.0, 0.0, 0.0, 0.03605279, 0.56645505, 0.0, | |||
| 0.66074054, 0.0, 0.0, 0.07871833, 0.0, 0.0, 0.0, 0.0, | |||
| 0.0, 0.0, 0.26077944, 0.0, 0.0, 0.19883228, 0.26075606, | |||
| 0.0, 0.55120887, 0.0, 0.0, 0.13896239, 0.8079261, 0.0 | |||
| ], | |||
| [ | |||
| 1.3890246, 0.0, 0.0176582, 0.41937874, 0.01668789, 0.08115837, | |||
| 0.0, 0.0, 0.0, 0.03283852, 0.0, 0.28331658, 0.0, 0.56971081, | |||
| 1.29951652, 0.0, 0.05585489, 0.0, 0.0, 0.0, 0.4555721, 0.0, | |||
| 0.0, 0.0, 1.13440652, 0.3462467, 0.53066361, 0.85311426, | |||
| 0.13320967, 0.61478612, 0.0, 0.0, 0.0, 0.0, 0.0, 0.04859889, | |||
| 0.0, 0.0884254, 0.0, 0.56573542, 0.18211658, 0.0, 0.24407104, | |||
| 0.0, 0.07133323, 0.0, 0.0, 0.98712028, 0.0, 0.06996351, | |||
| 0.70575429, 0.30689567, 0.47709064, 0.07469221, 0.40548246, | |||
| 0.09671662, 0.56150121, 0.0, 0.7116001, 0.57194077, 0.0, | |||
| 0.10528511, 0.20317026, 0.03516737, 0.0, 0.0, 0.10198436, | |||
| 0.0, 0.0, 0.0, 0.35702522, 0.0, 0.0, 0.32883485, 0.0, | |||
| 0.0, 0.18996724, 0.0, 0.0, 0.0, 0.06601356, 0.0, | |||
| 0.41925782, 0.0, 0.0, 0.07929863, 0.28089351, 0.0, | |||
| 0.25405591, 0.09954264, 1.05735563, 0.0, 0.57732162, 0.0, | |||
| 0.05791431, 0.0, 0.42524903, 0.0, 0.0, 0.0, 0.0, 0.0, | |||
| 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.13586283, 0.23484103, | |||
| 0.69677156, 0.0, 0.0, 0.08609836, 0.89882583 | |||
| ] | |||
| ] | |||
| ]) | |||
| labels = [7, 2] | |||
| # Convert labels to one-hot encoding | |||
| num_classes = 10 | |||
| labels_one_hot = np.zeros((len(labels), num_classes)) | |||
| for i, label in enumerate(labels): | |||
| labels_one_hot[i, label] = 1 | |||
| # Calculate the loss derivative | |||
| loss_derivative = outputs - labels_one_hot | |||
| print("Loss Derivative:") | |||
| print(loss_derivative) | |||
| @@ -0,0 +1,78 @@ | |||
| import unittest | |||
| import numpy as np | |||
| from neural_net.mnist import MNISTNeuralNet | |||
| from neural_net.functions.loss import cross_entropy_loss | |||
| # noinspection PyMethodMayBeStatic | |||
| class MNISTNeuralNetTests(unittest.TestCase): | |||
| def test_loss(self): | |||
| mnist = MNISTNeuralNet() | |||
| # Sample predictions and labels for testing the loss function | |||
| predictions = np.array([[0.1, 0.2, 0.7], # Example of a softmax output (probabilities) | |||
| [0.2, 0.6, 0.2]]) | |||
| # Corresponding labels (correct class indices) | |||
| labels = np.array([2, 1]) # Labels are class indices (not one-hot) | |||
| # Expected loss (you may need to compute this manually to verify correctness) | |||
| expected_loss = cross_entropy_loss(predictions, labels) # Replace with the actual expected loss value | |||
| # Call the loss function | |||
| computed_loss = mnist.loss(predictions, labels) | |||
| # Assert that the computed loss matches the expected loss | |||
| self.assertAlmostEqual(computed_loss, expected_loss, places=5, msg="Loss function is incorrect") | |||
| def test_derivative_loss(self): | |||
| mnist = MNISTNeuralNet() | |||
| # Sample predictions and labels for testing the derivative of the loss function | |||
| predictions = np.array([[0.1, 0.2, 0.7], # Example of softmax output (probabilities) | |||
| [0.2, 0.6, 0.2]]) | |||
| # Corresponding labels (correct class indices) | |||
| labels = np.array([2, 1]) # Labels are class indices | |||
| # Expected derivative of loss (manually computed or from a trusted source) | |||
| expected_derivative = np.array([[0.1, 0.2, -0.3], # Replace with actual expected gradient | |||
| [0.2, -0.4, 0.2]]) | |||
| # Call the derivative loss function | |||
| computed_derivative = mnist.loss_derivative(predictions, labels) | |||
| # Assert that the computed derivative matches the expected derivative | |||
| np.testing.assert_array_almost_equal(computed_derivative, expected_derivative, decimal=5, | |||
| err_msg="Derivative of loss function is incorrect") | |||
| def test_derivative_loss2(self): | |||
| mnist = MNISTNeuralNet() | |||
| # Given outputs | |||
| outputs = np.array([ | |||
| [0.06873367, 0.043651, 0.043651, 0.05235898, 0.043651, 0.043651, | |||
| 0.043651, 0.043651, 0.0563062, 0.043651], | |||
| [0.043651, 0.043651, 0.05704588, 0.0551587, 0.05460022, 0.043651, | |||
| 0.043651, 0.043651, 0.07723706, 0.05474726] | |||
| ]) | |||
| # Labels | |||
| labels = [7, 2] | |||
| num_classes = 10 | |||
| # Convert labels to one-hot encoding | |||
| labels_one_hot = np.zeros((len(labels), num_classes)) | |||
| for i, label in enumerate(labels): | |||
| labels_one_hot[i, label] = 1 | |||
| # Calculate the expected loss derivative | |||
| expected_loss_derivative = outputs - labels_one_hot | |||
| # Call the derivative loss function | |||
| computed_loss_derivative = mnist.loss_derivative(outputs, labels) | |||
| # Assert that the computed derivative matches the expected derivative | |||
| np.testing.assert_array_almost_equal(computed_loss_derivative, expected_loss_derivative, decimal=5, | |||
| err_msg="Derivative of loss function is incorrect") | |||
| @@ -0,0 +1,154 @@ | |||
| import unittest | |||
| import numpy as np | |||
| from neural_net.activation_layers.relu_layer import ReluLayer | |||
| # noinspection PyMethodMayBeStatic | |||
| class ReluLayerTests(unittest.TestCase): | |||
| def test_relu_layer_1x1(self): | |||
| ############## | |||
| # Arrange # | |||
| ############## | |||
| inputs = np.array([[1.0]]) | |||
| weights = np.array([[0.5]]) | |||
| biases = np.array([0.0]) | |||
| learning_rate = 0.001 | |||
| # Pre-activation value (z) | |||
| # This is the intermediate value calculated as the weighted sum of inputs plus the bias. | |||
| z = np.dot(inputs, weights) + biases | |||
| # ReLU activation: f(z) = max(0, z) | |||
| # The expected output after applying the ReLU activation function | |||
| expected_output = np.maximum(0, z) | |||
| # Loss gradient dL/dout | |||
| # Represents how much the loss changes when the output changes. | |||
| dL_dout = np.array([[1.0]]) | |||
| # Activation derivative dout/dz | |||
| # For ReLU: If z > 0, dout/dz = 1; otherwise, dout/dz = 0 | |||
| dout_dz = np.where(z > 0, 1.0, 0.0) | |||
| # Gradient of the loss with respect to weights (dL/dweights) | |||
| # This represents how much the loss changes when the weights change. | |||
| # Formula: dL/dweights = inputs × dL/dout × σ′(z) | |||
| expected_dl_dweights = inputs * dL_dout * dout_dz | |||
| # Gradient of the loss with respect to the bias (dL/dbias) | |||
| expected_dL_dbias = np.sum(dL_dout * dout_dz) | |||
| # Gradient of the loss with respect to inputs (dL/dinputs) | |||
| # This is the gradient of the loss with respect to the input of the neuron or layer, often needed if you want to backpropagate further. | |||
| # Formula: dL / dinputs = dL/dout × σ′(z) × weights | |||
| expected_dl_dinputs = dL_dout * dout_dz * weights | |||
| # Calculate expected new weights and biases | |||
| expected_weights = weights - learning_rate * expected_dl_dweights | |||
| expected_biases = biases - learning_rate * expected_dL_dbias | |||
| # Initialize SigmoidLayer | |||
| layer = ReluLayer(weights.shape[0], weights.shape[1], weights=weights, biases=biases) | |||
| ############## | |||
| # Act # | |||
| ############## | |||
| # Forward pass | |||
| output = layer.forward(inputs) | |||
| # Backward pass | |||
| dl_dinputs = layer.backward(dL_dout, learning_rate) | |||
| ############## | |||
| # Assert # | |||
| ############## | |||
| ############## | |||
| # Assert # | |||
| ############## | |||
| # Forward output correctness | |||
| self.assertTrue(np.allclose(output, expected_output, atol=1e-6), | |||
| f"Forward output incorrect: Actual: {output}, Expected: {expected_output}") | |||
| # Backward pass correctness | |||
| self.assertTrue(np.allclose(dl_dinputs, expected_dl_dinputs, atol=1e-6), | |||
| f"Inputs derivative incorrect Actual: {dl_dinputs}, expected: {expected_dl_dinputs}") | |||
| self.assertTrue(np.allclose(layer.weights, expected_weights, atol=1e-6), | |||
| f"Weight update incorrect Actual: {layer.weights}, expected: {expected_weights}") | |||
| self.assertTrue(np.allclose(layer.biases, expected_biases, atol=1e-6), | |||
| f"Bias update incorrect Actual: {layer.biases}, expected: {expected_biases}") | |||
| def test_relu_layer_2x2(self): | |||
| ############## | |||
| # Arrange # | |||
| ############## | |||
| inputs = np.array([[1.0, 2.0], | |||
| [3.0, 4.0]]) # 2x2 input matrix | |||
| weights = np.array([[0.5, 0.2], | |||
| [0.3, 0.7]]) # 2x2 weight matrix | |||
| biases = np.array([0.1, -0.1]) # 2 biases, one for each neuron | |||
| learning_rate = 0.001 # Learning rate for weight updates | |||
| # Pre-activation value (z) | |||
| # z = inputs.dot(weights) + biases | |||
| z = np.dot(inputs, weights) + biases | |||
| # Expected output using the ReLU activation function | |||
| expected_output = np.maximum(0, z) # Apply ReLU | |||
| # Loss gradient dL/dout (assuming a gradient of 1 for simplicity) | |||
| dL_dout = np.array([[1.0, 1.0], | |||
| [1.0, 1.0]]) | |||
| # Activation derivative dout/dz | |||
| # For ReLU: dout/dz = 1 where z > 0, and dout/dz = 0 where z <= 0 | |||
| dout_dz = np.where(z > 0, 1.0, 0.0) | |||
| # Expected gradients (for backpropagation) | |||
| # Expected gradients with respect to weights | |||
| expected_dl_dweights = np.dot(inputs.T, dL_dout * dout_dz) | |||
| # Expected gradients with respect to biases | |||
| expected_dL_dbias = np.sum(dL_dout * dout_dz, axis=0) | |||
| # Expected gradients with respect to inputs | |||
| expected_dl_dinputs = np.dot(dL_dout * dout_dz, weights.T) | |||
| # Expected updated weights and biases after backpropagation | |||
| expected_weights = weights - learning_rate * expected_dl_dweights | |||
| expected_biases = biases - learning_rate * expected_dL_dbias | |||
| # Initialize the ReLU Layer | |||
| layer = ReluLayer(weights.shape[0], weights.shape[1], weights=weights, biases=biases) | |||
| ############## | |||
| # Act # | |||
| ############## | |||
| # Forward pass | |||
| output = layer.forward(inputs) | |||
| # Backward pass | |||
| dl_dinputs = layer.backward(dL_dout, learning_rate) | |||
| ############## | |||
| # Assert # | |||
| ############## | |||
| # Forward output correctness | |||
| self.assertTrue(np.allclose(output, expected_output, atol=1e-6), | |||
| f"Forward output incorrect: Actual: {output}, Expected: {expected_output}") | |||
| # Backward pass correctness (for input gradients) | |||
| self.assertTrue(np.allclose(dl_dinputs, expected_dl_dinputs, atol=1e-6), | |||
| f"Inputs derivative incorrect Actual: {dl_dinputs}, Expected: {expected_dl_dinputs}") | |||
| # Check weight updates | |||
| self.assertTrue(np.allclose(layer.weights, expected_weights, atol=1e-6), | |||
| f"Weight update incorrect Actual: {layer.weights}, Expected: {expected_weights}") | |||
| # Check bias updates | |||
| self.assertTrue(np.allclose(layer.biases, expected_biases, atol=1e-6), | |||
| f"Bias update incorrect Actual: {layer.biases}, Expected: {expected_biases}") | |||
| @@ -0,0 +1,142 @@ | |||
| import unittest | |||
| import numpy as np | |||
| from neural_net.activation_layers.sigmoid_layer import SigmoidLayer | |||
| # noinspection PyMethodMayBeStatic | |||
| class SigmoidLayerTests(unittest.TestCase): | |||
| def test_sigmoid_layer_1x1(self): | |||
| ############## | |||
| # Arrange # | |||
| ############## | |||
| inputs = np.array([[1.0]]) | |||
| weights = np.array([[0.5]]) | |||
| biases = np.array([0.0]) | |||
| learning_rate = 0.001 | |||
| # Pre-activation value (z) | |||
| # This is the intermediate value calculated as the weighted sum of inputs plus the bias. | |||
| z = np.dot(inputs, weights) + biases | |||
| # Ouput | |||
| # The result of applying the activation function to the pre-activation value z | |||
| # Sigmoid activation formula: 1 / (1 + e^-z) | |||
| expected_output = 1 / (1 + np.exp(-z)) | |||
| # Loss gradient dL/dout | |||
| # Represents how much the loss changes when the output changes. | |||
| dL_dout = np.array([[1.0]]) | |||
| # Activation derivative dout/dz | |||
| # This tells you how much the output of the activation function changes with respect to the pre-activation value z. | |||
| # Sigmoid derivative formula: σ(z) * (1 - σ(z)) | |||
| dout_dz = expected_output * (1.0 - expected_output) | |||
| # Gradient of the loss with respect to weights (dL/dweights) | |||
| # This represents how much the loss changes when the weights change. | |||
| # Formula: dL/dweights = inputs × dL/dout × σ′(z) | |||
| expected_dl_dweights = inputs * dL_dout * dout_dz | |||
| # Gradient of the loss with respect to the bias (dL/dbias) | |||
| expected_dL_dbias = np.sum(dL_dout * dout_dz) | |||
| # Gradient of the loss with respect to inputs (dL/dinputs) | |||
| # This is the gradient of the loss with respect to the input of the neuron or layer, often needed if you want to backpropagate further. | |||
| # Formula: dL / dinputs = dL/dout × σ′(z) × weights | |||
| expected_dl_dinputs = dL_dout * dout_dz * weights | |||
| # Calculate expected new weights and biases | |||
| expected_weights = weights - learning_rate * expected_dl_dweights | |||
| expected_biases = biases - learning_rate * expected_dL_dbias | |||
| # Initialize SigmoidLayer | |||
| layer = SigmoidLayer(weights.shape[0], weights.shape[1], weights=weights, biases=biases) | |||
| ############## | |||
| # Act # | |||
| ############## | |||
| # Forward pass | |||
| output = layer.forward(inputs) | |||
| # Backward pass | |||
| dl_dinputs = layer.backward(dL_dout, learning_rate) | |||
| ############## | |||
| # Assert # | |||
| ############## | |||
| # Forward output correctness | |||
| self.assertTrue(np.allclose(output, expected_output, atol=1e-6), | |||
| f"Forward output incorrect: Actual: {output}, Expected: {expected_output}") | |||
| # Backward pass correctness | |||
| self.assertTrue(np.allclose(dl_dinputs, expected_dl_dinputs, atol=1e-6), | |||
| f"Inputs derivative incorrect Actual: {dl_dinputs}, expected: {expected_dl_dinputs}") | |||
| self.assertTrue(np.allclose(layer.weights, expected_weights, atol=1e-6), | |||
| f"Weight update incorrect Actual: {layer.weights}, expected: {expected_weights}") | |||
| self.assertTrue(np.allclose(layer.biases, expected_biases, atol=1e-6), | |||
| f"Bias update incorrect Actual: {layer.biases}, expected: {expected_biases}") | |||
| def test_sigmoid_layer_2x2(self): | |||
| ############## | |||
| # Arrange # | |||
| ############## | |||
| inputs = np.array([[1.0, 2.0], | |||
| [3.0, 4.0]]) | |||
| weights = np.array([[0.5, 0.2], | |||
| [0.3, 0.7]]) | |||
| biases = np.array([0.1, -0.1]) | |||
| learning_rate = 0.001 | |||
| # Pre-activation value (z) | |||
| # z = inputs.dot(weights) + biases | |||
| z = np.dot(inputs, weights) + biases | |||
| # Expected output using the sigmoid function | |||
| expected_output = 1 / (1 + np.exp(-z)) | |||
| # Loss gradient dL/dout (assuming a gradient of 1 for simplicity) | |||
| dL_dout = np.array([[1.0, 1.0], | |||
| [1.0, 1.0]]) | |||
| # Activation derivative dout/dz | |||
| dout_dz = expected_output * (1 - expected_output) | |||
| # Expected gradients | |||
| expected_dl_dweights = np.dot(inputs.T, dL_dout * dout_dz) | |||
| expected_dL_dbias = np.sum(dL_dout * dout_dz, axis=0) | |||
| expected_dl_dinputs = np.dot(dL_dout * dout_dz, weights.T) | |||
| # Expected updated weights and biases | |||
| expected_weights = weights - learning_rate * expected_dl_dweights | |||
| expected_biases = biases - learning_rate * expected_dL_dbias | |||
| # Initialize SigmoidLayer (assuming SigmoidLayer class exists) | |||
| layer = SigmoidLayer(weights.shape[0], weights.shape[1], weights=weights, biases=biases) | |||
| ############## | |||
| # Act # | |||
| ############## | |||
| # Forward pass | |||
| output = layer.forward(inputs) | |||
| # Backward pass | |||
| dl_dinputs = layer.backward(dL_dout, learning_rate) | |||
| ############## | |||
| # Assert # | |||
| ############## | |||
| # Forward output correctness | |||
| self.assertTrue(np.allclose(output, expected_output, atol=1e-6), | |||
| f"Forward output incorrect: Actual: {output}, Expected: {expected_output}") | |||
| # Backward pass correctness | |||
| self.assertTrue(np.allclose(dl_dinputs, expected_dl_dinputs, atol=1e-6), | |||
| f"Inputs derivative incorrect Actual: {dl_dinputs}, expected: {expected_dl_dinputs}") | |||
| self.assertTrue(np.allclose(layer.weights, expected_weights, atol=1e-6), | |||
| f"Weight update incorrect Actual: {layer.weights}, expected: {expected_weights}") | |||
| self.assertTrue(np.allclose(layer.biases, expected_biases, atol=1e-6), | |||
| f"Bias update incorrect Actual: {layer.biases}, expected: {expected_biases}") | |||
| @@ -0,0 +1,18 @@ | |||
| import tkinter as tk | |||
| from ui.app_state import AppState | |||
| from ui.front_page.front_page import FrontPage | |||
| from ui.icons import icons | |||
| class App(tk.Tk): | |||
| def __init__(self): | |||
| super().__init__() | |||
| self.app_state = AppState(auto_load=True) | |||
| icons.load_icons() | |||
| self.title("MNIST Training Center") | |||
| self.geometry("1024x720") | |||
| self.front_page = FrontPage(self, self.app_state) | |||
| self.front_page.pack(expand=1, fill="both") | |||
| @@ -0,0 +1,23 @@ | |||
| import os.path | |||
| from data.mnist_loader import MNISTModelData | |||
| from neural_net.mnist import MNISTNeuralNet | |||
| from neural_net.neural_net import NeuralNet, ModelData | |||
| class AppState: | |||
| def __init__(self, auto_load=False): | |||
| self.trainers = [] | |||
| if auto_load: | |||
| self.neural_net: NeuralNet = MNISTNeuralNet() | |||
| data_folder = "/projects/learning/datasets/minst" | |||
| self.model_data: ModelData = MNISTModelData( | |||
| os.path.join(data_folder, "train-images-idx3-ubyte"), | |||
| os.path.join(data_folder, "train-labels-idx1-ubyte"), | |||
| os.path.join(data_folder, "t10k-images-idx3-ubyte"), | |||
| os.path.join(data_folder, "t10k-labels-idx1-ubyte") | |||
| ) | |||
| self.neural_net.recalculate_accuracy(self.model_data.test_inputs, self.model_data.test_labels) | |||
| self.neural_net.recalculate_loss(self.model_data.test_inputs, self.model_data.test_labels) | |||
| else: | |||
| self.neural_net: NeuralNet = None | |||
| self.model_data: ModelData = None | |||
| @@ -0,0 +1,61 @@ | |||
| import tkinter as tk | |||
| import numpy as np | |||
| from PIL import ImageGrab, ImageTk | |||
| from PIL.Image import Resampling | |||
| class DigitDrawer(tk.Frame): | |||
| def __init__(self, parent, canvas_width, canvas_height): | |||
| super().__init__(parent) | |||
| self.canvas_width = canvas_width | |||
| self.canvas_height = canvas_height | |||
| self.brush_size = 3 | |||
| self.update_ui() | |||
| def clear_ui(self): | |||
| for widget in self.winfo_children(): | |||
| widget.destroy() | |||
| def update_ui(self): | |||
| self.clear_ui() | |||
| # Create a Canvas to draw on | |||
| self.canvas = tk.Canvas(self, width=self.canvas_width, height=self.canvas_height, bg='white') | |||
| self.canvas.pack(padx=10, pady=10) | |||
| self.canvas_demo = tk.Canvas(self, width=28, height=28, bg='white') | |||
| self.canvas_demo.pack(padx=10, pady=10) | |||
| # Clear Button | |||
| self.clear_button = tk.Button(self, text="Clear", command=self.clear_canvas) | |||
| self.clear_button.pack(expand=True, fill='both') | |||
| # Bind mouse events to draw on the canvas | |||
| self.canvas.bind("<B1-Motion>", self.paint) | |||
| def paint(self, event): | |||
| """Draw on the canvas by creating ovals (circles) at mouse position.""" | |||
| x1, y1 = (event.x - self.brush_size), (event.y - self.brush_size) | |||
| x2, y2 = (event.x + self.brush_size), (event.y + self.brush_size) | |||
| self.canvas.create_oval(x1, y1, x2, y2, fill='black', outline='black') | |||
| def clear_canvas(self): | |||
| """Clear the canvas to allow the user to draw a new digit.""" | |||
| self.canvas.delete("all") | |||
| def convert_to_array(self): | |||
| """Convert the canvas drawing to a 28x28 grayscale array.""" | |||
| # Get the canvas's pixel data and save it temporarily | |||
| x = self.winfo_rootx() + self.canvas.winfo_x() | |||
| y = self.winfo_rooty() + self.canvas.winfo_y() | |||
| x1 = x + self.canvas.winfo_width() | |||
| y1 = y + self.canvas.winfo_height() | |||
| # Capture the canvas area and convert it into a grayscale image using PIL | |||
| image = ImageGrab.grab((x, y, x1, y1)).convert("L").resize((28, 28), resample=Resampling.HAMMING) | |||
| self.demo_image = ImageTk.PhotoImage(image) | |||
| self.canvas_demo.create_image(0, 0, anchor=tk.NW, image=self.demo_image) | |||
| image_array = np.asarray(image) / 255.0 | |||
| print(np.array(image_array).reshape((28, 28))) | |||
| flat_array = image_array.flatten() | |||
| return flat_array | |||
| @@ -0,0 +1,21 @@ | |||
| import tkinter as tk | |||
| from ui.icons.icons import icons | |||
| class LabelWithRefresh(tk.Frame): | |||
| def __init__(self, parent, initial_text, callback, initial_state=tk.DISABLED): | |||
| super().__init__(parent) | |||
| self.callback = callback | |||
| self._create_ui(initial_text, initial_state) | |||
| def _create_ui(self, initial_text, initial_state): | |||
| self.refresh_button = tk.Button(self, image=icons["refresh"], state=initial_state, command=self.callback) | |||
| self.refresh_button.pack(side=tk.RIGHT, padx=5) | |||
| self.label = tk.Label(self, text=initial_text) | |||
| self.label.pack(side=tk.RIGHT, padx=5) | |||
| def set_state(self, state): | |||
| self.refresh_button.config(state=state) | |||
| def set_text(self, text): | |||
| self.label.config(text=text) | |||
| @@ -0,0 +1,14 @@ | |||
| import tkinter as tk | |||
| class NumberSlider(tk.Frame): | |||
| def __init__(self, parent, value, from_, to, resolution): | |||
| super().__init__(parent) | |||
| self.value = value | |||
| self.update_ui(from_, to, resolution) | |||
| def update_ui(self, from_, to, resolution): | |||
| self.entry = tk.Entry(self, textvariable=self.value) | |||
| self.entry.pack(side=tk.RIGHT, padx=5) | |||
| self.scaler = tk.Scale(self, from_=from_, to=to, length=200, resolution=resolution, showvalue=False, orient=tk.HORIZONTAL, sliderrelief="flat", relief="flat", borderwidth=0, variable=self.value) | |||
| self.scaler.set(self.value.get()) | |||
| self.scaler.pack(side=tk.RIGHT, padx=5) | |||
| @@ -0,0 +1,27 @@ | |||
| import tkinter as tk | |||
| from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg | |||
| from matplotlib.figure import Figure | |||
| from ui.plotters.plotter import Plotter | |||
| class PlotFrame(tk.Frame): | |||
| def __init__(self, parent, width=None, height=None): | |||
| super().__init__(parent, width=width, height=height) | |||
| if width is not None or height is not None: | |||
| self.pack_propagate(False) | |||
| self.figure = self.create_plot_figure() | |||
| self.plotter: Plotter = None | |||
| def create_plot_figure(self): | |||
| figure = Figure(layout="compressed", facecolor=(0,0,0)) | |||
| # Create a matplotlib canvas to display the plot | |||
| canvas = FigureCanvasTkAgg(figure, self) | |||
| canvas.draw() | |||
| (canvas.get_tk_widget() | |||
| .pack(fill=tk.BOTH, expand=False, padx=0, pady=0, ipadx=0, ipady=0)) | |||
| return figure | |||
| def update_data(self, data): | |||
| self.plotter.update_plot(data) | |||
| @@ -0,0 +1,76 @@ | |||
| import os | |||
| import tkinter as tk | |||
| from data.mnist_loader import MNISTModelData | |||
| from ui.app_state import AppState | |||
| from ui.front_page.sections.model_overview_section import NeuralNetInfo | |||
| from ui.front_page.sections.test_model_section import TestModelSection | |||
| from ui.front_page.sections.training_section import TrainingSection | |||
| class FrontPage(tk.Frame): | |||
| def __init__(self, parent, app_state: AppState): | |||
| super().__init__(parent) | |||
| self.parent = parent | |||
| self.app_state = app_state | |||
| self.main_frame = None | |||
| self.neural_net_info = None | |||
| self.model_actions_frame = None | |||
| self.start_training_section = None | |||
| self.test_model_section = None | |||
| self.training_section = None | |||
| self.test_model_section = None | |||
| self.create_ui() | |||
| def create_ui(self): | |||
| (tk.Label(self, text="Welcome to MNIST Learning Center", font=("Arial", 16)) | |||
| .pack(side=tk.TOP, fill=tk.BOTH, expand=False, padx=5)) | |||
| self.main_frame = tk.Frame(self) | |||
| self.main_frame.pack(fill=tk.BOTH, expand=True) | |||
| self.neural_net_info = NeuralNetInfo(self.main_frame, self.app_state, self.on_model_loaded, self.on_data_loaded) | |||
| self.neural_net_info.pack(side=tk.TOP, fill=tk.BOTH, expand=True, padx=5) | |||
| self.load_model_actions_frame() | |||
| def update(self): | |||
| if self.neural_net_info is not None: | |||
| self.neural_net_info.update() | |||
| self.load_model_actions_frame() | |||
| def load_model_actions_frame(self): | |||
| if self.model_actions_frame is None and self.app_state.neural_net is not None and self.app_state.model_data is not None: | |||
| self.model_actions_frame = tk.Frame(self.main_frame) | |||
| self.model_actions_frame.pack(side=tk.BOTTOM, fill=tk.BOTH, expand=True, padx=5) | |||
| self.training_section = TrainingSection(self.model_actions_frame, self.app_state, self.after_training) | |||
| self.training_section.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5) | |||
| self.test_model_section = TestModelSection(self.model_actions_frame, self.app_state) | |||
| self.test_model_section.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5) | |||
| else: | |||
| if self.test_model_section is not None: | |||
| self.test_model_section.update() | |||
| if self.training_section is not None: | |||
| self.training_section.update() | |||
| def on_data_loaded(self): | |||
| print("Data loaded") | |||
| self.update() | |||
| def on_model_loaded(self): | |||
| print("Model loaded") | |||
| self.update() | |||
| def load_training_data(self): | |||
| data_folder = "/projects/learning/datasets/minst" | |||
| self.app_state.model_data = MNISTModelData( | |||
| os.path.join(data_folder, "train-images-idx3-ubyte"), | |||
| os.path.join(data_folder, "train-labels-idx1-ubyte"), | |||
| os.path.join(data_folder, "t10k-images-idx3-ubyte"), | |||
| os.path.join(data_folder, "t10k-labels-idx1-ubyte") | |||
| ) | |||
| self.update() | |||
| def after_training(self): | |||
| self.update() | |||
| @@ -0,0 +1,41 @@ | |||
| from abc import ABC | |||
| from matplotlib.figure import Figure | |||
| from neural_net.epoch import Epoch | |||
| from neural_net.neural_net import NeuralNet | |||
| from ui.components.plot_figure import PlotFrame | |||
| from ui.plotters.plotter import Plotter | |||
| class GradientsPlot(PlotFrame): | |||
| def __init__(self, parent, neural_net: NeuralNet): | |||
| super().__init__(parent) | |||
| self.plotter = GradientsPlotter(self.figure, neural_net) | |||
| class GradientsPlotter(Plotter, ABC): | |||
| def __init__(self, figure: Figure, neural_net: NeuralNet): | |||
| super().__init__(figure) | |||
| self.neural_net = neural_net | |||
| self.axes = figure.subplots(1, 2) | |||
| def reset_plot(self): | |||
| self.axes[0].clear() | |||
| self.axes[0].set_xlabel('Neuron Index') | |||
| self.axes[0].set_ylabel('Input Index') | |||
| self.axes[1].clear() | |||
| self.axes[1].set_xlabel('Output Neuron Index') | |||
| self.axes[1].set_ylabel('Hidden Neuron Index') | |||
| def plot(self, data: Epoch): | |||
| gradients_layer1 = data.layer_dl_gradients[1][-1] | |||
| self.axes[0].imshow(gradients_layer1, cmap='coolwarm', aspect='auto') | |||
| gradients_layer2 = data.layer_dl_gradients[0][-1] | |||
| self.axes[1].imshow(gradients_layer2, cmap='coolwarm', aspect='auto') | |||
| def plot_gradients_histogram(self, current_epoch: Epoch): | |||
| gradients_layer1 = current_epoch.layer_dl_gradients[1][-1] | |||
| self.axes[0].hist(gradients_layer1.flatten(), bins=50, color='blue', alpha=0.7) | |||
| gradients_layer2 = current_epoch.layer_dl_gradients[0][-1] | |||
| self.axes[1].hist(gradients_layer2.flatten(), bins=50, color='green', alpha=0.7) | |||
| @@ -0,0 +1,39 @@ | |||
| from ui.components.plot_figure import PlotFrame | |||
| import math | |||
| from abc import ABC | |||
| from matplotlib.figure import Figure | |||
| from neural_net.activation_layers.activation_layer import ActivationLayer | |||
| from neural_net.neural_net import NeuralNet | |||
| from ui.plotters.plotter import Plotter | |||
| from utils.matplotlib.utils import mpl_matshow | |||
| class LayerWeightsPlot(PlotFrame): | |||
| def __init__(self, parent, neural_net: NeuralNet, layer: ActivationLayer, rows, cols): | |||
| super().__init__(parent) | |||
| self.plotter = LayerWeightsPlotter(self.figure, neural_net, layer, rows, cols) | |||
| class LayerWeightsPlotter(Plotter, ABC): | |||
| def __init__(self, figure: Figure, neural_net: NeuralNet, layer: ActivationLayer, rows, columns): | |||
| super().__init__(figure) | |||
| self.neural_net = neural_net | |||
| self.layer = layer | |||
| self.axes = figure.subplots(nrows=rows, ncols=columns, squeeze=True, | |||
| gridspec_kw={'wspace': 0.05, 'hspace': 0.05}) | |||
| def reset_plot(self): | |||
| for axes in self.axes: | |||
| for ax in axes: | |||
| ax.clear() | |||
| def plot(self, data): | |||
| weights = self.layer.weights.T | |||
| n_neurons = weights.shape[0] | |||
| n_pixels = weights.shape[1] | |||
| for i in range(n_neurons): | |||
| row = i // self.axes.shape[1] | |||
| col = i % self.axes.shape[1] | |||
| mpl_matshow(self.axes[row, col], weights[i], int(math.sqrt(n_pixels))) | |||
| @@ -0,0 +1,40 @@ | |||
| from neural_net.trainer import NeuralNetTrainer | |||
| from ui.components.plot_figure import PlotFrame | |||
| from abc import ABC | |||
| from matplotlib.figure import Figure | |||
| from neural_net.neural_net import NeuralNet | |||
| from ui.plotters.plotter import Plotter | |||
| class LossPlot(PlotFrame): | |||
| def __init__(self, parent, neural_net: NeuralNet, trainer: NeuralNetTrainer): | |||
| super().__init__(parent) | |||
| self.plotter = LossPlotter(self.figure, neural_net, trainer) | |||
| class LossPlotter(Plotter, ABC): | |||
| def __init__(self, figure: Figure, neural_net: NeuralNet, trainer: NeuralNetTrainer): | |||
| super().__init__(figure) | |||
| self.neural_net = neural_net | |||
| self.trainer = trainer | |||
| self.axes = figure.add_subplot() | |||
| def reset_plot(self): | |||
| self.axes.clear() | |||
| self.axes.set_title('Loss') | |||
| self.axes.set_ylabel("Loss") | |||
| self.axes.set_xlabel("Epoch") | |||
| def plot(self, data): | |||
| losses = [] | |||
| for epoch in self.trainer.epoch_history: | |||
| if epoch.finished: | |||
| losses.append(epoch.loss) | |||
| self.axes.plot(losses, marker='o', label=f"Loss") | |||
| for idx, loss in enumerate(losses): | |||
| self.axes.annotate(f"{loss:.4f}", xy=(idx, loss), rotation=45) | |||
| self.axes.legend() | |||
| self.axes.grid(True) | |||
| @@ -0,0 +1,37 @@ | |||
| from ui.components.plot_figure import PlotFrame | |||
| from abc import ABC | |||
| from matplotlib.figure import Figure | |||
| from ui.plotters.plotter import Plotter | |||
| class PredictionsPlot(PlotFrame): | |||
| def __init__(self, parent): | |||
| super().__init__(parent, height=32) | |||
| self.plotter = PredictionsPlotter(self.figure) | |||
| class PredictionsPlotter(Plotter, ABC): | |||
| def __init__(self, figure: Figure): | |||
| super().__init__(figure) | |||
| self.axes = figure.add_subplot() | |||
| self.clean_axes() | |||
| def plot(self, data): | |||
| self.axes.imshow(data, cmap='coolwarm', aspect='auto') | |||
| for idx in range(10): | |||
| self.axes.annotate(f"{idx}", xy=(idx - 0.2, 0.2)) | |||
| self.clean_axes() | |||
| def clean_axes(self): | |||
| # Remove axis ticks, labels, and spines | |||
| self.axes.set_xticks([]) # Remove x-ticks | |||
| self.axes.set_yticks([]) # Remove y-ticks | |||
| self.axes.spines['top'].set_visible(False) | |||
| self.axes.spines['bottom'].set_visible(False) | |||
| self.axes.spines['left'].set_visible(False) | |||
| self.axes.spines['right'].set_visible(False) | |||
| self.axes.set_facecolor((0, 0, 0)) | |||
| def reset_plot(self): | |||
| self.axes.clear() | |||
| @@ -0,0 +1,75 @@ | |||
| import os | |||
| import tkinter as tk | |||
| from data.mnist_loader import MNISTModelData | |||
| from neural_net.mnist import MNISTNeuralNet | |||
| from ui.app_state import AppState | |||
| from ui.front_page.sections.neural_net_info_widget import NeuralNetInfoWidget | |||
| class NeuralNetInfo(tk.LabelFrame): | |||
| def __init__(self, parent, app_state: AppState, on_load_model, on_load_data): | |||
| super().__init__(parent, text="Model overview") | |||
| self.app_state = app_state | |||
| self.cb_on_load_model = on_load_model | |||
| self.cb_on_load_data = on_load_data | |||
| self.create_ui() | |||
| def create_ui(self): | |||
| # Option to load model (could be a file dialog or dropdown in future) | |||
| self.load_model_button = tk.Button(self, text="Load model", command=self.on_load_model) | |||
| self.load_model_button.pack(padx=5, pady=5, side=tk.TOP) | |||
| if self.app_state.neural_net is None: | |||
| self.model_status = tk.Label(self, text="No model loaded") | |||
| self.model_status.pack(padx=5, pady=5) | |||
| else: | |||
| self.load_model_button.config(text="Reload model") | |||
| load_data_button = tk.Button(self, text="Load data", command=self.on_load_data) | |||
| load_data_button.pack(padx=5, pady=5, side=tk.TOP) | |||
| if self.app_state.model_data is None: | |||
| self.data_status = tk.Label(self, text="No data loaded") | |||
| self.data_status.pack(padx=5, pady=5) | |||
| else: | |||
| load_data_button.config(text="Reload data") | |||
| self.neural_net_info = NeuralNetInfoWidget(self, self.app_state) | |||
| self.neural_net_info.pack(padx=5, pady=5) | |||
| def update(self): | |||
| if self.app_state.neural_net is None and self.model_status is None: | |||
| self.model_status = tk.Label(self, text="No model loaded") | |||
| self.model_status.pack(padx=5, pady=5) | |||
| load_data_button = tk.Button(self, text="Load data", command=self.on_load_data) | |||
| load_data_button.pack(padx=5, pady=5, side=tk.TOP) | |||
| if self.app_state.model_data is None: | |||
| self.data_status = tk.Label(self, text="No data loaded") | |||
| self.data_status.pack(padx=5, pady=5) | |||
| else: | |||
| load_data_button.config(text="Reload data") | |||
| self.neural_net_info = NeuralNetInfoWidget(self, self.app_state) | |||
| self.neural_net_info.pack(padx=5, pady=5) | |||
| def on_load_data(self): | |||
| data_folder = "/projects/learning/datasets/minst" | |||
| self.app_state.model_data = MNISTModelData( | |||
| os.path.join(data_folder, "train-images-idx3-ubyte"), | |||
| os.path.join(data_folder, "train-labels-idx1-ubyte"), | |||
| os.path.join(data_folder, "t10k-images-idx3-ubyte"), | |||
| os.path.join(data_folder, "t10k-labels-idx1-ubyte") | |||
| ) | |||
| if self.app_state.neural_net is not None: | |||
| self.app_state.neural_net.recalculate_loss(self.app_state.model_data.test_inputs, self.app_state.model_data.test_labels) | |||
| self.app_state.neural_net.recalculate_accuracy(self.app_state.model_data.test_inputs, self.app_state.model_data.test_labels) | |||
| if self.cb_on_load_data is not None: | |||
| self.cb_on_load_data() | |||
| def on_load_model(self): | |||
| self.app_state.neural_net = MNISTNeuralNet() | |||
| if self.app_state.model_data is not None: | |||
| self.app_state.neural_net.recalculate_loss(self.app_state.model_data.test_inputs, self.app_state.model_data.test_labels) | |||
| self.app_state.neural_net.recalculate_accuracy(self.app_state.model_data.test_inputs, self.app_state.model_data.test_labels) | |||
| if self.cb_on_load_model is not None: | |||
| self.cb_on_load_model() | |||
| @@ -0,0 +1,53 @@ | |||
| import tkinter as tk | |||
| from ui.app_state import AppState | |||
| from ui.components.label_with_refresh import LabelWithRefresh | |||
| class NeuralNetInfoWidget(tk.Frame): | |||
| def __init__(self, parent, app_state: AppState): | |||
| super().__init__(parent) | |||
| self.app_state = app_state | |||
| self.update_ui() | |||
| def clear_ui(self): | |||
| for widget in self.winfo_children(): | |||
| widget.destroy() | |||
| def update_ui(self): | |||
| self.clear_ui() | |||
| row = 0 | |||
| if self.app_state.neural_net is not None: | |||
| for layer in self.app_state.neural_net.layers: | |||
| (tk.Label(self, text=f"{layer.type} {layer.index}") | |||
| .grid(column=0, row=row, padx=10, pady=5, sticky='w')) | |||
| tk.Label(self, text=f"{layer.input_dim} -> {layer.output_dim} neurons").grid(column=1, row=row, padx=10, pady=5, sticky='e') | |||
| row += 1 | |||
| button_state = tk.DISABLED | |||
| if self.app_state.model_data is not None: | |||
| button_state = tk.NORMAL | |||
| tk.Label(self, text="Accuracy:").grid(column=0, row=row, padx=10, pady=5, sticky='w') | |||
| last_accuracy = "NA" | |||
| if self.app_state.neural_net.last_accuracy is not None: | |||
| last_accuracy = f"{self.app_state.neural_net.last_accuracy * 100:.2f}%" | |||
| self.accuracy_label = LabelWithRefresh(self, last_accuracy, callback=self.recalculate_accuracy, initial_state=button_state) | |||
| self.accuracy_label.grid(column=1, row=row, padx=10, pady=5, sticky='e') | |||
| row += 1 | |||
| tk.Label(self, text="Current Loss:").grid(column=0, row=row, padx=10, pady=5, sticky='w') | |||
| last_loss = "NA" | |||
| if self.app_state.neural_net.last_loss is not None: | |||
| last_loss = f"{self.app_state.neural_net.last_loss:.4f}" | |||
| self.loss_label = LabelWithRefresh(self, last_loss, callback=self.recalculate_loss, initial_state=button_state) | |||
| self.loss_label.grid(column=1, row=row, padx=10, pady=5, sticky='e') | |||
| row += 1 | |||
| def recalculate_accuracy(self): | |||
| self.app_state.neural_net.recalculate_accuracy(self.app_state.model_data.test_inputs, self.app_state.model_data.test_labels) | |||
| self.update_ui() | |||
| def recalculate_loss(self): | |||
| self.app_state.neural_net.recalculate_loss(self.app_state.model_data.test_inputs, self.app_state.model_data.test_labels) | |||
| self.update_ui() | |||
| @@ -0,0 +1,42 @@ | |||
| import tkinter as tk | |||
| from ui.app_state import AppState | |||
| from ui.components.digit_drawer import DigitDrawer | |||
| from ui.front_page.plots.predictions import PredictionsPlot | |||
| class TestModelSection(tk.LabelFrame): | |||
| def __init__(self, parent, app_state: AppState): | |||
| super().__init__(parent, text="Model testing") | |||
| self.app_state = app_state | |||
| self.update_ui() | |||
| def clear_ui(self): | |||
| for widget in self.winfo_children(): | |||
| widget.destroy() | |||
| def update_ui(self): | |||
| self.clear_ui() | |||
| self.digit_drawer = DigitDrawer(self, 100, 100) | |||
| self.digit_drawer.pack(fill=tk.BOTH, expand=True) | |||
| # Predict Button (converts drawing to 28x28 and shows the array) | |||
| self.predict_button = tk.Button(self, text="Predict", command=self.predict_number) | |||
| self.predict_button.pack(fill=tk.BOTH, expand=True) | |||
| frame_prediction = tk.Frame(self, height=200) | |||
| frame_prediction.pack(fill=tk.BOTH, expand=True) | |||
| (tk.Label(frame_prediction, text="Prediction: ") | |||
| .pack(side=tk.LEFT)) | |||
| self.lbl_prediction = tk.Label(frame_prediction, text="/") | |||
| self.lbl_prediction.pack(side=tk.LEFT) | |||
| self.prediction_plot = PredictionsPlot(self) | |||
| self.prediction_plot.pack(side=tk.BOTTOM, anchor=tk.S, fill=tk.X, expand=True) | |||
| def predict_number(self): | |||
| inputs = self.digit_drawer.convert_to_array() | |||
| raw_predictions, predictions = self.app_state.neural_net.predict([inputs]) | |||
| print(predictions) | |||
| self.lbl_prediction.config(text=f"{predictions[0]}") | |||
| self.prediction_plot.update_data(raw_predictions) | |||
| @@ -0,0 +1,43 @@ | |||
| import tkinter as tk | |||
| from neural_net.epoch import Epoch | |||
| class EpochInformation(tk.LabelFrame): | |||
| def __init__(self, parent, epoch: Epoch): | |||
| super().__init__(parent, text="Last epoch info") | |||
| self.epoch = epoch | |||
| self.lbl_epoch_training_time = None | |||
| self.lbl_last_loss = None | |||
| self.create_ui() | |||
| def create_ui(self): | |||
| row = 0 | |||
| tk.Label(self, text="Duration:", anchor=tk.W).grid(column=0, row=row, | |||
| sticky=tk.E, | |||
| padx=(10, 20), pady=5) | |||
| self.lbl_epoch_training_time = tk.Label(self, text=f"{self.epoch.duration:.2f}sec") | |||
| self.lbl_epoch_training_time.grid(column=1, row=row, sticky=tk.E, padx=10, pady=5) | |||
| row += 1 | |||
| tk.Label(self, text="Loss value:", anchor=tk.W).grid(column=0, row=row, | |||
| sticky=tk.E, padx=(10, 20), | |||
| pady=5) | |||
| self.lbl_last_loss = tk.Label(self, text=f"{self.epoch.loss:.4f}") | |||
| self.lbl_last_loss.grid(column=1, row=row, sticky=tk.E, padx=10, pady=5) | |||
| row += 1 | |||
| tk.Label(self, text="Learning rate:", anchor=tk.W).grid(column=0, row=row, | |||
| sticky=tk.E, padx=(10, 20), | |||
| pady=5) | |||
| self.lbl_learning_rate = tk.Label(self, text=f"{self.epoch.learning_rate:.4f}") | |||
| self.lbl_learning_rate.grid(column=1, row=row, sticky=tk.E, padx=10, pady=5) | |||
| def update(self): | |||
| print(f"Updating training data for epoch {self.epoch}") | |||
| self.lbl_epoch_training_time.config(text=f"{self.epoch.duration:.2f}sec") | |||
| self.lbl_last_loss.config(text=f"{self.epoch.loss:.4f}") | |||
| self.lbl_learning_rate.config(text=f"{self.epoch.learning_rate:.4f}") | |||
| def set_epoch(self, epoch: Epoch): | |||
| self.epoch = epoch | |||
| self.update() | |||
| @@ -0,0 +1,79 @@ | |||
| import threading | |||
| import tkinter as tk | |||
| from neural_net.trainer import NeuralNetTrainer | |||
| from ui.app_state import AppState | |||
| from ui.components.number_slider import NumberSlider | |||
| from ui.training_page.training_page import EpochInformation | |||
| class TrainingSection(tk.LabelFrame): | |||
| def __init__(self, parent, app_state: AppState, on_update_neural_net_info): | |||
| super().__init__(parent, text="Model training") | |||
| self.app_state = app_state | |||
| self.on_update_neural_net_info = on_update_neural_net_info | |||
| self.batch_size = tk.IntVar() | |||
| self.batch_size.set(1000) | |||
| self.batch_size_slider = None | |||
| self.learning_rate = tk.DoubleVar() | |||
| self.learning_rate.set(0.0001) | |||
| self.learning_rate_slider = None | |||
| self.btn_start_stop = None | |||
| self.stop_button = None | |||
| self.training_information_container: EpochInformation = None | |||
| self.trainer: NeuralNetTrainer = NeuralNetTrainer(self.app_state.neural_net, self.app_state.model_data, | |||
| self.learning_rate.get(), self.batch_size.get()) | |||
| self.create_ui() | |||
| def create_ui(self): | |||
| tk.Label(self, text="Batch size:").grid(column=0, row=0, padx=10, pady=5, sticky='w') | |||
| self.batch_size_slider = NumberSlider(self, self.batch_size, from_=100, to=10000, resolution=1) | |||
| self.batch_size_slider.grid(column=1, row=0, padx=10, pady=5, sticky='w') | |||
| tk.Label(self, text="Learning rate:").grid(column=0, row=1, padx=10, pady=5, sticky='w') | |||
| self.learning_rate_slider = NumberSlider(self, self.learning_rate, from_=0.0001, to=0.1, resolution=0.0001) | |||
| self.learning_rate_slider.grid(column=1, row=1, padx=10, pady=5, sticky='w') | |||
| self.btn_prev_epoch = tk.Button(self, text="<<", command=self.on_prev_epoch) | |||
| self.btn_prev_epoch.grid(column=0, row=2, padx=10, pady=10, sticky='w') | |||
| self.btn_start_stop = tk.Button(self, text="Start", command=self.toggle_state) | |||
| self.btn_start_stop.grid(column=1, row=2, padx=10, pady=10, sticky='w') | |||
| self.btn_next_epoch = tk.Button(self, text=">>", command=self.on_next_epoch) | |||
| self.btn_next_epoch.grid(column=2, row=2, padx=10, pady=10, sticky='w') | |||
| def update(self): | |||
| if self.trainer.is_running: | |||
| if self.training_information_container is None: | |||
| self.training_information_container = EpochInformation(self, self.trainer.epoch_history[-1]) | |||
| self.training_information_container.grid(column=0, row=5, padx=10, pady=10, sticky='e') | |||
| self.btn_start_stop.config(text="Stop") | |||
| else: | |||
| print("Setting the epoch") | |||
| self.training_information_container.set_epoch(self.trainer.epoch_history[-1]) | |||
| else: | |||
| self.btn_start_stop.config(text="Start") | |||
| def toggle_state(self): | |||
| if self.trainer.is_running: | |||
| self.trainer.stop() | |||
| else: | |||
| self.thread = threading.Thread(target=self.trainer.start, args=(self.on_epoch_finish, self.on_update_neural_net_info)) | |||
| self.thread.start() | |||
| # self.trainer.start(self.on_epoch_finish, self.on_update_neural_net_info) | |||
| self.update() | |||
| def start(self): | |||
| self.thread = threading.Thread(target=self.trainer.start) | |||
| self.thread.start() | |||
| # self.trainer.start(on_epoch_finished=self.update_training_data) | |||
| def on_epoch_finish(self, epoch): | |||
| print("Updating the epoch") | |||
| self.update() | |||
| def on_prev_epoch(self): | |||
| pass | |||
| def on_next_epoch(self): | |||
| pass | |||
| @@ -0,0 +1,14 @@ | |||
| import tkinter as tk | |||
| from PIL import Image, ImageTk | |||
| from PIL.Image import Resampling | |||
| icons = {} | |||
| def _load_icon(path, size): | |||
| img = Image.open(path) | |||
| img = img.resize(size, resample=Resampling.HAMMING) | |||
| return ImageTk.PhotoImage(img) | |||
| def load_icons(): | |||
| icons["refresh"] = _load_icon("ui/icons/refresh.png", (24, 24)) | |||
| @@ -0,0 +1,7 @@ | |||
| from abc import ABC | |||
| from matplotlib.figure import Figure | |||
| from neural_net.epoch import Epoch | |||
| from neural_net.neural_net import NeuralNet | |||
| from ui.plotters.plotter import Plotter | |||
| @@ -0,0 +1 @@ | |||
| @@ -0,0 +1,30 @@ | |||
| from abc import abstractmethod | |||
| from matplotlib.figure import Figure | |||
| from neural_net.epoch import Epoch | |||
| class Plotter: | |||
| def __init__(self, figure: Figure): | |||
| self.figure = figure | |||
| def initialize_plots(self): | |||
| self.figure.show() | |||
| @abstractmethod | |||
| def update_plot(self, data): | |||
| self.reset_plot() | |||
| self.plot(data) | |||
| self.figure.canvas.draw() | |||
| self.figure.canvas.flush_events() | |||
| @abstractmethod | |||
| def reset_plot(self): | |||
| pass | |||
| @abstractmethod | |||
| def plot(self, current_epoch: Epoch): | |||
| pass | |||
| @@ -0,0 +1,103 @@ | |||
| import threading | |||
| import tkinter as tk | |||
| from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg | |||
| from matplotlib.figure import Figure | |||
| from neural_net.epoch import Epoch | |||
| from neural_net.trainer import NeuralNetTrainer | |||
| from ui.app_state import AppState | |||
| from ui.front_page.plots.gradients import GradientsPlot | |||
| from ui.front_page.plots.layer_weights import LayerWeightsPlot | |||
| from ui.front_page.plots.loss import LossPlot | |||
| from ui.front_page.sections.training_information import EpochInformation | |||
| class TrainingPage(tk.Frame): | |||
| def __init__(self, parent, app_state: AppState, on_training_finished=None): | |||
| super().__init__(parent) | |||
| self.app_state = app_state | |||
| self.on_training_finished = on_training_finished | |||
| self.trainer: NeuralNetTrainer = None | |||
| # trainer = NeuralNetTrainer(self.app_state.neural_net, self.app_state.model_data, learning_rate, nr_epochs) | |||
| # self.app_state.trainers.append(trainer) | |||
| # self.trainer = trainer | |||
| self.create_ui() | |||
| def start(self, learning_rate, nr_epochs, batch_size, callback=None): | |||
| if self.trainer is not None: | |||
| self.trainer.stop() | |||
| self.trainer = NeuralNetTrainer(self.app_state.neural_net, self.app_state.model_data, | |||
| learning_rate=learning_rate, nr_epochs=nr_epochs, batch_size=batch_size, | |||
| on_epoch_callback=self.update_training_data, | |||
| on_finished_callback=self.on_training_finished) | |||
| self.trainer.on_epoch_callback = self.update_training_data | |||
| self.thread = threading.Thread(target=self.trainer.start) | |||
| self.thread.start() | |||
| # self.trainer.start(on_epoch_finished=self.update_training_data) | |||
| if callback is not None: | |||
| callback() | |||
| def update_training_data(self, training_run, data: Epoch): | |||
| print(f"Updating training data {data.epoch}") | |||
| if self.trainer.is_running: | |||
| self.training_information_container.update_training_data(training_run, data) | |||
| self.loss_plot.update_training_data(training_run, data) | |||
| if data.epoch % 5 == 0: | |||
| self.gradients_plot.update_training_data(training_run, data) | |||
| self.layer0_weights_plot.update_training_data(training_run, data) | |||
| self.layer1_weights_plot.update_training_data(training_run, data) | |||
| def create_ui(self): | |||
| # Training center | |||
| self.training_information_container = EpochInformation(self, self.app_state.neural_net, self.trainer) | |||
| self.training_information_container.pack(side=tk.TOP, fill=tk.X, expand=False, pady=10, padx=10, ipady=10, | |||
| ipadx=10) | |||
| actions_frame = tk.Frame(self) | |||
| actions_frame.pack(side=tk.TOP, fill=tk.X, expand=False, pady=10, padx=10, ipady=10, ipadx=10) | |||
| btn_text = "Pause" | |||
| self.btn_toggle_pause = tk.Button(actions_frame, text=btn_text, command=self.toggle_state) | |||
| self.btn_toggle_pause.pack(side=tk.LEFT) | |||
| btn_stop = tk.Button(actions_frame, text="Stop", command=self.trainer.stop) | |||
| btn_stop.pack(side=tk.LEFT) | |||
| # Plot tabs | |||
| plot_tab_control = tk.Notebook(self) | |||
| plot_tab_control.pack(side=tk.BOTTOM, fill=tk.BOTH, expand=True, pady=0, padx=0, ipady=0, ipadx=0) | |||
| self.loss_plot = LossPlot(plot_tab_control, self.app_state.neural_net) | |||
| plot_tab_control.add(self.loss_plot, text="Loss Function") | |||
| self.gradients_plot = GradientsPlot(plot_tab_control, self.app_state.neural_net) | |||
| plot_tab_control.add(self.gradients_plot, text="Gradients") | |||
| self.layer0_weights_plot = LayerWeightsPlot(plot_tab_control, self.app_state.neural_net, | |||
| self.app_state.neural_net.layers[0], | |||
| 11, 11) | |||
| plot_tab_control.add(self.layer0_weights_plot, text="Weights layer 0") | |||
| self.layer1_weights_plot = LayerWeightsPlot(plot_tab_control, self.app_state.neural_net, | |||
| self.app_state.neural_net.layers[1], | |||
| 2, 5) | |||
| plot_tab_control.add(self.layer1_weights_plot, text="Weights layer 1") | |||
| def toggle_state(self): | |||
| self.trainer.toggle_state() | |||
| if self.trainer.training_paused: | |||
| self.btn_toggle_pause.config(text="Resume") | |||
| else: | |||
| self.btn_toggle_pause.config(text="Pause") | |||
| @staticmethod | |||
| def create_plot_figure(tab): | |||
| figure = Figure() | |||
| # Create a matplotlib canvas to display the plot | |||
| canvas = FigureCanvasTkAgg(figure, tab) | |||
| canvas.draw() | |||
| canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) | |||
| return figure | |||
| @@ -0,0 +1,10 @@ | |||
| def force_aspect(ax, aspect=1): | |||
| im = ax.get_images() | |||
| extent = im[0].get_extent() | |||
| ax.set_aspect(abs((extent[1]-extent[0])/(extent[3]-extent[2]))/aspect) | |||
| def mpl_matshow(ax, data, shape): | |||
| ax.matshow(data.reshape(shape, shape), cmap='hot', aspect='auto') | |||
| ax.set_xticks([]) | |||
| ax.set_yticks([]) | |||
| force_aspect(ax) | |||