From 1e32b0adb2fdc003f6012667d3d9d2e808262681 Mon Sep 17 00:00:00 2001 From: igor-davidyuk <“iprotonv@protonmail.ch”> Date: Fri, 18 Mar 2022 18:00:43 +0300 Subject: [PATCH] squashed commits. Kvasir + LinReg experiments --- .../envoy/kvasir_shard_descriptor.py | 3 + .../PyTorch_Kvasir_UNet.ipynb | 575 ++++++++++++ .../workspace_corruption_experiment/layers.py | 100 +++ .../start_federation.ipynb | 824 ++++++++++++++++++ .../envoy/linreg_shard_descriptor.py | 5 +- .../workspace_corrupt_envoy/LinReg.ipynb | 515 +++++++++++ .../workspace_corrupt_envoy/custom_adapter.py | 21 + .../workspace_corrupt_envoy/requirements.txt | 4 + .../start_federation.ipynb | 232 +++++ .../interface/interactive_api/experiment.py | 7 +- openfl/utilities/dataset_spoilers/__init__.py | 14 + .../dataset_spoilers/dataset_spoil_methods.py | 34 + .../dataset_spoilers/shard_corruptor.py | 37 + 13 files changed, 2367 insertions(+), 4 deletions(-) create mode 100644 openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/workspace_corruption_experiment/PyTorch_Kvasir_UNet.ipynb create mode 100644 openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/workspace_corruption_experiment/layers.py create mode 100644 openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/workspace_corruption_experiment/start_federation.ipynb create mode 100644 openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/LinReg.ipynb create mode 100644 openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/custom_adapter.py create mode 100644 openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/requirements.txt create mode 100644 openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/start_federation.ipynb create mode 100644 openfl/utilities/dataset_spoilers/__init__.py create mode 100644 openfl/utilities/dataset_spoilers/dataset_spoil_methods.py create mode 100644 openfl/utilities/dataset_spoilers/shard_corruptor.py diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/kvasir_shard_descriptor.py b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/kvasir_shard_descriptor.py index 1107d2decf..e448e9a19a 100644 --- a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/kvasir_shard_descriptor.py +++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/kvasir_shard_descriptor.py @@ -11,6 +11,8 @@ from openfl.interface.interactive_api.shard_descriptor import ShardDataset from openfl.interface.interactive_api.shard_descriptor import ShardDescriptor from openfl.utilities import validate_file_hash +from openfl.utilities.dataset_spoilers import corrupt_shard +from openfl.utilities.dataset_spoilers import spoil_targets_random_choice class KvasirShardDataset(ShardDataset): @@ -55,6 +57,7 @@ def __len__(self): return len(self.images_names) +@corrupt_shard(spoil_targets_random_choice) class KvasirShardDescriptor(ShardDescriptor): """Shard descriptor class.""" diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/workspace_corruption_experiment/PyTorch_Kvasir_UNet.ipynb b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/workspace_corruption_experiment/PyTorch_Kvasir_UNet.ipynb new file mode 100644 index 0000000000..2988bd9c21 --- /dev/null +++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/workspace_corruption_experiment/PyTorch_Kvasir_UNet.ipynb @@ -0,0 +1,575 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "liquid-jacket", + "metadata": {}, + "source": [ + "# Federated Kvasir with Director example" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "alike-sharing", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Requirement already satisfied: torchvision==0.8.1 in /home/idavidyu/.virtualenvs/corrupt-envoy/lib/python3.8/site-packages (0.8.1)\n", + "Requirement already satisfied: pillow>=4.1.1 in /home/idavidyu/.virtualenvs/corrupt-envoy/lib/python3.8/site-packages (from torchvision==0.8.1) (9.0.1)\n", + "Requirement already satisfied: torch==1.7.0 in /home/idavidyu/.virtualenvs/corrupt-envoy/lib/python3.8/site-packages (from torchvision==0.8.1) (1.7.0)\n", + "Requirement already satisfied: numpy in /home/idavidyu/.virtualenvs/corrupt-envoy/lib/python3.8/site-packages (from torchvision==0.8.1) (1.22.2)\n", + "Requirement already satisfied: future in /home/idavidyu/.virtualenvs/corrupt-envoy/lib/python3.8/site-packages (from torch==1.7.0->torchvision==0.8.1) (0.18.2)\n", + "Requirement already satisfied: dataclasses in /home/idavidyu/.virtualenvs/corrupt-envoy/lib/python3.8/site-packages (from torch==1.7.0->torchvision==0.8.1) (0.6)\n", + "Requirement already satisfied: typing-extensions in /home/idavidyu/.virtualenvs/corrupt-envoy/lib/python3.8/site-packages (from torch==1.7.0->torchvision==0.8.1) (3.10.0.2)\n", + "\u001b[33mWARNING: You are using pip version 22.0.3; however, version 22.0.4 is available.\n", + "You should consider upgrading via the '/home/idavidyu/.virtualenvs/corrupt-envoy/bin/python -m pip install --upgrade pip' command.\u001b[0m\u001b[33m\n", + "\u001b[0m" + ] + } + ], + "source": [ + "# Install dependencies if not already installed\n", + "!pip install torchvision==0.8.1" + ] + }, + { + "cell_type": "markdown", + "id": "16986f22", + "metadata": {}, + "source": [ + "# Connect to the Federation" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "4485ac79", + "metadata": {}, + "outputs": [], + "source": [ + "# Create a federation\n", + "from openfl.interface.interactive_api.federation import Federation\n", + "\n", + "# please use the same identificator that was used in signed certificate\n", + "client_id = 'frontend'\n", + "director_node_fqdn = 'localhost'\n", + "director_port = 50050\n", + "\n", + "# Run with TLS disabled (trusted environment)\n", + "# Federation can also determine local fqdn automatically\n", + "federation = Federation(\n", + " client_id=client_id,\n", + " director_node_fqdn=director_node_fqdn,\n", + " director_port=director_port,\n", + " tls=False\n", + ")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "e35802d5", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "data": { + "text/plain": [ + "{'env_3': {'shard_info': node_info {\n", + " name: \"env_3\"\n", + " cuda_devices {\n", + " index: 2\n", + " memory_total: 11554717696\n", + " memory_utilized: 6225920\n", + " device_utilization: \"0%\"\n", + " cuda_driver_version: \"470.57.02\"\n", + " cuda_version: \"11.4\"\n", + " name: \"NVIDIA GeForce RTX 2080 Ti\"\n", + " }\n", + " }\n", + " shard_description: \"Kvasir dataset, shard number 3 out of 3\"\n", + " sample_shape: \"300\"\n", + " sample_shape: \"400\"\n", + " sample_shape: \"3\"\n", + " target_shape: \"300\"\n", + " target_shape: \"400\",\n", + " 'is_online': True,\n", + " 'is_experiment_running': False,\n", + " 'last_updated': '2022-03-14 17:14:06',\n", + " 'current_time': '2022-03-14 17:14:08',\n", + " 'valid_duration': seconds: 10,\n", + " 'experiment_name': 'ExperimentName Mock'},\n", + " 'env_1': {'shard_info': node_info {\n", + " name: \"env_1\"\n", + " cuda_devices {\n", + " memory_total: 11554717696\n", + " memory_utilized: 6225920\n", + " device_utilization: \"0%\"\n", + " cuda_driver_version: \"470.57.02\"\n", + " cuda_version: \"11.4\"\n", + " name: \"NVIDIA GeForce RTX 2080 Ti\"\n", + " }\n", + " }\n", + " shard_description: \"Kvasir dataset, shard number 1 out of 3\"\n", + " sample_shape: \"300\"\n", + " sample_shape: \"400\"\n", + " sample_shape: \"3\"\n", + " target_shape: \"300\"\n", + " target_shape: \"400\",\n", + " 'is_online': True,\n", + " 'is_experiment_running': False,\n", + " 'last_updated': '2022-03-14 17:14:06',\n", + " 'current_time': '2022-03-14 17:14:08',\n", + " 'valid_duration': seconds: 10,\n", + " 'experiment_name': 'ExperimentName Mock'},\n", + " 'env_2': {'shard_info': node_info {\n", + " name: \"env_2\"\n", + " cuda_devices {\n", + " index: 1\n", + " memory_total: 11546394624\n", + " memory_utilized: 49283072\n", + " device_utilization: \"0%\"\n", + " cuda_driver_version: \"470.57.02\"\n", + " cuda_version: \"11.4\"\n", + " name: \"NVIDIA GeForce RTX 2080 Ti\"\n", + " }\n", + " }\n", + " shard_description: \"Kvasir dataset, shard number 2 out of 3\"\n", + " sample_shape: \"300\"\n", + " sample_shape: \"400\"\n", + " sample_shape: \"3\"\n", + " target_shape: \"300\"\n", + " target_shape: \"400\",\n", + " 'is_online': True,\n", + " 'is_experiment_running': False,\n", + " 'last_updated': '2022-03-14 17:14:06',\n", + " 'current_time': '2022-03-14 17:14:08',\n", + " 'valid_duration': seconds: 10,\n", + " 'experiment_name': 'ExperimentName Mock'}}" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "federation.get_shard_registry()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "67ae50de", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['300', '400']" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "federation.target_shape" + ] + }, + { + "cell_type": "markdown", + "id": "obvious-tyler", + "metadata": {}, + "source": [ + "## Creating a FL experiment using Interactive API" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "rubber-address", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/idavidyu/.virtualenvs/corrupt-envoy/lib/python3.8/site-packages/tqdm/auto.py:22: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" + ] + } + ], + "source": [ + "from openfl.interface.interactive_api.experiment import TaskInterface, DataInterface, ModelInterface, FLExperiment" + ] + }, + { + "cell_type": "markdown", + "id": "sustainable-public", + "metadata": {}, + "source": [ + "### Register dataset" + ] + }, + { + "cell_type": "markdown", + "id": "unlike-texas", + "metadata": {}, + "source": [ + "We extract User dataset class implementation.\n", + "Is it convinient?\n", + "What if the dataset is not a class?" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "64f37dcf", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import PIL\n", + "import numpy as np\n", + "from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler\n", + "from torchvision import transforms as tsf\n", + "\n", + "\n", + "class KvasirShardDataset(Dataset):\n", + " \n", + " def __init__(self, dataset):\n", + " self._dataset = dataset\n", + " \n", + " # Prepare transforms\n", + " self.img_trans = tsf.Compose([\n", + " tsf.ToPILImage(),\n", + " tsf.Resize((332, 332)),\n", + " tsf.ToTensor(),\n", + " tsf.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])\n", + " self.mask_trans = tsf.Compose([\n", + " tsf.ToPILImage(),\n", + " tsf.Resize((332, 332), interpolation=PIL.Image.NEAREST),\n", + " tsf.ToTensor()])\n", + " \n", + " def __getitem__(self, index):\n", + " img, mask = self._dataset[index]\n", + " img = self.img_trans(img).numpy()\n", + " mask = self.mask_trans(mask).numpy()\n", + " return img, mask\n", + " \n", + " def __len__(self):\n", + " return len(self._dataset)\n", + "\n", + " \n", + "\n", + "# Now you can implement you data loaders using dummy_shard_desc\n", + "class KvasirSD(DataInterface):\n", + "\n", + " def __init__(self, validation_fraction=1/8, **kwargs):\n", + " super().__init__(**kwargs)\n", + " \n", + " self.validation_fraction = validation_fraction\n", + " \n", + " @property\n", + " def shard_descriptor(self):\n", + " return self._shard_descriptor\n", + " \n", + " @shard_descriptor.setter\n", + " def shard_descriptor(self, shard_descriptor):\n", + " \"\"\"\n", + " Describe per-collaborator procedures or sharding.\n", + "\n", + " This method will be called during a collaborator initialization.\n", + " Local shard_descriptor will be set by Envoy.\n", + " \"\"\"\n", + " self._shard_descriptor = shard_descriptor\n", + " self._shard_dataset = KvasirShardDataset(shard_descriptor.get_dataset('train'))\n", + " \n", + " validation_size = max(1, int(len(self._shard_dataset) * self.validation_fraction))\n", + " \n", + " self.train_indeces = np.arange(len(self._shard_dataset) - validation_size)\n", + " self.val_indeces = np.arange(len(self._shard_dataset) - validation_size, len(self._shard_dataset))\n", + " \n", + " def get_train_loader(self, **kwargs):\n", + " \"\"\"\n", + " Output of this method will be provided to tasks with optimizer in contract\n", + " \"\"\"\n", + " train_sampler = SubsetRandomSampler(self.train_indeces)\n", + " return DataLoader(\n", + " self._shard_dataset,\n", + " num_workers=8,\n", + " batch_size=self.kwargs['train_bs'],\n", + " sampler=train_sampler\n", + " )\n", + "\n", + " def get_valid_loader(self, **kwargs):\n", + " \"\"\"\n", + " Output of this method will be provided to tasks without optimizer in contract\n", + " \"\"\"\n", + " val_sampler = SubsetRandomSampler(self.val_indeces)\n", + " return DataLoader(\n", + " self._shard_dataset,\n", + " num_workers=8,\n", + " batch_size=self.kwargs['valid_bs'],\n", + " sampler=val_sampler\n", + " )\n", + "\n", + " def get_train_data_size(self):\n", + " \"\"\"\n", + " Information for aggregation\n", + " \"\"\"\n", + " return len(self.train_indeces)\n", + "\n", + " def get_valid_data_size(self):\n", + " \"\"\"\n", + " Information for aggregation\n", + " \"\"\"\n", + " return len(self.val_indeces)" + ] + }, + { + "cell_type": "markdown", + "id": "caring-distinction", + "metadata": {}, + "source": [ + "### Describe a model and optimizer" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "visible-victor", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/idavidyu/.virtualenvs/corrupt-envoy/lib/python3.8/site-packages/tqdm/auto.py:22: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" + ] + } + ], + "source": [ + "import torch\n", + "import torch.nn as nn\n", + "import torch.optim as optim" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "foreign-gospel", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "UNet model definition\n", + "\"\"\"\n", + "from layers import soft_dice_coef, soft_dice_loss, DoubleConv, Down, Up\n", + "\n", + "\n", + "class UNet(nn.Module):\n", + " def __init__(self, n_channels=3, n_classes=1):\n", + " super().__init__()\n", + " self.inc = DoubleConv(n_channels, 64)\n", + " self.down1 = Down(64, 128)\n", + " self.down2 = Down(128, 256)\n", + " self.down3 = Down(256, 512)\n", + " self.up1 = Up(512, 256)\n", + " self.up2 = Up(256, 128)\n", + " self.up3 = Up(128, 64)\n", + " self.outc = nn.Conv2d(64, n_classes, 1)\n", + "\n", + " def forward(self, x):\n", + " x1 = self.inc(x)\n", + " x2 = self.down1(x1)\n", + " x3 = self.down2(x2)\n", + " x4 = self.down3(x3)\n", + " x = self.up1(x4, x3)\n", + " x = self.up2(x, x2)\n", + " x = self.up3(x, x1)\n", + " x = self.outc(x)\n", + " x = torch.sigmoid(x)\n", + " return x\n", + " \n", + "model_unet = UNet()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "greater-activation", + "metadata": {}, + "outputs": [], + "source": [ + "optimizer_adam = optim.Adam(model_unet.parameters(), lr=1e-4,)" + ] + }, + { + "cell_type": "markdown", + "id": "caroline-passion", + "metadata": {}, + "source": [ + "#### Register model" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "handled-teens", + "metadata": {}, + "outputs": [], + "source": [ + "framework_adapter = 'openfl.plugins.frameworks_adapters.pytorch_adapter.FrameworkAdapterPlugin'\n", + "MI = ModelInterface(model=model_unet, optimizer=optimizer_adam, framework_plugin=framework_adapter)" + ] + }, + { + "cell_type": "markdown", + "id": "c3fd3f69", + "metadata": {}, + "source": [ + "### Choose an aggregation function" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9fd4c499", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "from openfl.component.aggregation_functions import Median, WeightedAverage, AggregationFunction\n", + "\n", + "#The Interactive API supports overriding of the aggregation function\n", + "class One_Good_Envoy(AggregationFunction):\n", + " def __init__(self, col_name='3', weight_scale: float = 0.5):\n", + " self.good_col = col_name\n", + " self.weight_scale = weight_scale\n", + "\n", + " def call(self, local_tensors, *_) -> np.ndarray:\n", + " weights = [x.weight if self.good_col in x.col_name else x.weight * self.weight_scale\n", + " for x in local_tensors]\n", + " tensors = np.array([x.tensor for x in local_tensors])\n", + " return np.average(tensors, weights=weights, axis=0)\n", + " \n", + " \n", + "# aggregation_function = One_Good_Envoy()\n", + "aggregation_function = WeightedAverage()" + ] + }, + { + "cell_type": "markdown", + "id": "portuguese-groove", + "metadata": {}, + "source": [ + "### Define and register FL tasks" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "increasing-builder", + "metadata": {}, + "outputs": [], + "source": [ + "TI = TaskInterface()\n", + "import torch\n", + "import tqdm\n", + "\n", + "\n", + "@TI.register_fl_task(model='unet_model', data_loader='train_loader', \\\n", + " device='device', optimizer='optimizer') \n", + "@TI.set_aggregation_function(aggregation_function)\n", + "def train(unet_model, train_loader, optimizer, device, loss_fn=soft_dice_loss):\n", + " \n", + " \"\"\" \n", + " The following constructions, that may lead to resource race\n", + " is no longer needed:\n", + " \n", + " if not torch.cuda.is_available():\n", + " device = 'cpu'\n", + " else:\n", + " device = 'cuda'\n", + " \n", + " \"\"\"\n", + "\n", + " print(f'\\n\\n TASK TRAIN GOT DEVICE {device}\\n\\n')\n", + " \n", + " train_loader = tqdm.tqdm(train_loader, desc=\"train\")\n", + " \n", + " unet_model.train()\n", + " unet_model.to(device)\n", + "\n", + " losses = []\n", + "\n", + " for data, target in train_loader:\n", + " data, target = torch.tensor(data).to(device), torch.tensor(\n", + " target).to(device, dtype=torch.float32)\n", + " optimizer.zero_grad()\n", + " output = unet_model(data)\n", + " loss = loss_fn(output=output, target=target)\n", + " loss.backward()\n", + " optimizer.step()\n", + " losses.append(loss.detach().cpu().numpy())\n", + " \n", + " return {'train_loss': np.mean(losses),}\n", + "\n", + "\n", + "@TI.register_fl_task(model='unet_model', data_loader='val_loader', device='device') \n", + "def validate(unet_model, val_loader, device):\n", + " print(f'\\n\\n TASK VALIDATE GOT DEVICE {device}\\n\\n')\n", + " \n", + " unet_model.eval()\n", + " unet_model.to(device)\n", + " \n", + " val_loader = tqdm.tqdm(val_loader, desc=\"validate\")\n", + "\n", + " val_score = 0\n", + " total_samples = 0\n", + "\n", + " with torch.no_grad():\n", + " for data, target in val_loader:\n", + " samples = target.shape[0]\n", + " total_samples += samples\n", + " data, target = torch.tensor(data).to(device), \\\n", + " torch.tensor(target).to(device, dtype=torch.int64)\n", + " output = unet_model(data)\n", + " val = soft_dice_coef(output, target)\n", + " val_score += val.sum().cpu().numpy()\n", + " \n", + " return {'dice_coef': val_score / total_samples,}" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/workspace_corruption_experiment/layers.py b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/workspace_corruption_experiment/layers.py new file mode 100644 index 0000000000..5165dcc97e --- /dev/null +++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/workspace_corruption_experiment/layers.py @@ -0,0 +1,100 @@ +"""Layers for Unet model.""" + +import torch +import torch.nn as nn +import torch.nn.functional as F + + +def soft_dice_loss(output, target): + """Calculate loss.""" + num = target.size(0) + m1 = output.view(num, -1) + m2 = target.view(num, -1) + intersection = m1 * m2 + score = 2.0 * (intersection.sum(1) + 1) / (m1.sum(1) + m2.sum(1) + 1) + score = 1 - score.sum() / num + return score + + +def soft_dice_coef(output, target): + """Calculate soft DICE coefficient.""" + num = target.size(0) + m1 = output.view(num, -1) + m2 = target.view(num, -1) + intersection = m1 * m2 + score = 2.0 * (intersection.sum(1) + 1) / (m1.sum(1) + m2.sum(1) + 1) + return score.sum() + + +class DoubleConv(nn.Module): + """Pytorch double conv class.""" + + def __init__(self, in_ch, out_ch): + """Initialize layer.""" + super(DoubleConv, self).__init__() + self.in_ch = in_ch + self.out_ch = out_ch + self.conv = nn.Sequential( + nn.Conv2d(in_ch, out_ch, 3, padding=1), + nn.BatchNorm2d(out_ch), + nn.ReLU(inplace=True), + nn.Conv2d(out_ch, out_ch, 3, padding=1), + nn.BatchNorm2d(out_ch), + nn.ReLU(inplace=True), + ) + + def forward(self, x): + """Do forward pass.""" + x = self.conv(x) + return x + + +class Down(nn.Module): + """Pytorch nn module subclass.""" + + def __init__(self, in_ch, out_ch): + """Initialize layer.""" + super(Down, self).__init__() + self.mpconv = nn.Sequential( + nn.MaxPool2d(2), + DoubleConv(in_ch, out_ch) + ) + + def forward(self, x): + """Do forward pass.""" + x = self.mpconv(x) + return x + + +class Up(nn.Module): + """Pytorch nn module subclass.""" + + def __init__(self, in_ch, out_ch, bilinear=False): + """Initialize layer.""" + super(Up, self).__init__() + self.in_ch = in_ch + self.out_ch = out_ch + if bilinear: + self.up = nn.Upsample( + scale_factor=2, + mode='bilinear', + align_corners=True + ) + else: + self.up = nn.ConvTranspose2d(in_ch, in_ch // 2, 2, stride=2) + self.conv = DoubleConv(in_ch, out_ch) + + def forward(self, x1, x2): + """Do forward pass.""" + x1 = self.up(x1) + diff_y = x2.size()[2] - x1.size()[2] + diff_x = x2.size()[3] - x1.size()[3] + + x1 = F.pad( + x1, + (diff_x // 2, diff_x - diff_x // 2, diff_y // 2, diff_y - diff_y // 2) + ) + + x = torch.cat([x2, x1], dim=1) + x = self.conv(x) + return x diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/workspace_corruption_experiment/start_federation.ipynb b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/workspace_corruption_experiment/start_federation.ipynb new file mode 100644 index 0000000000..ca047a1e82 --- /dev/null +++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/workspace_corruption_experiment/start_federation.ipynb @@ -0,0 +1,824 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "f813b6ae-b082-49bb-b64f-fd619b6de14a", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from pathlib import Path\n", + "import time\n", + "import yaml\n", + "from typing import Dict, List, Union" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "d1ee62ab-09e4-4f4c-984f-bdb6909d6106", + "metadata": {}, + "outputs": [], + "source": [ + "def generate_envoy_configs(config: Dict,\n", + " save_path: Union[str, Path] = '../envoy/',\n", + " n_cols_total: int = 10,\n", + " n_cols_corrupt: int = 1,\n", + " assign_gpus: bool = False,\n", + " image_hw: str = '300,400') -> List[Path]:\n", + " \n", + " config['shard_descriptor']['params']['enforce_image_hw'] = image_hw\n", + " \n", + " config_paths = [(Path(save_path) / f'{i}_envoy_config.yaml').absolute()\n", + " for i in range(1, n_cols_total + 1)]\n", + "\n", + " for i, path in enumerate(config_paths):\n", + " if assign_gpus:\n", + " config['params']['cuda_devices'] = [i,]\n", + " else:\n", + " config['params']['cuda_devices'] = []\n", + " if i < n_cols_corrupt:\n", + " config['shard_descriptor']['params']['corrupt'] = True\n", + " else:\n", + " config['shard_descriptor']['params']['corrupt'] = False\n", + " config['shard_descriptor']['params']['rank_worldsize'] = f'{i+1},{n_cols_total}'\n", + " with open(path, \"w\") as stream:\n", + " yaml.safe_dump(config, stream)\n", + " \n", + " return config_paths\n", + " \n", + "def remove_configs(config_paths):\n", + " for path in config_paths:\n", + " path.unlink()" + ] + }, + { + "cell_type": "markdown", + "id": "ec065be9-c2c6-4a81-9a2a-ea54794e52ba", + "metadata": {}, + "source": [ + "## Start the Director service" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "60bcaa49-aabb-42ec-a279-9e32b31ce6ca", + "metadata": {}, + "outputs": [], + "source": [ + "# cwd = Path.cwd()\n", + "# director_workspace_path = Path('../director/').absolute()\n", + "# director_config_file = director_workspace_path / 'director_config.yaml'\n", + "# director_logfile = director_workspace_path / 'director.log'\n", + "# director_logfile.unlink(missing_ok=True)\n", + "# # \n", + "\n", + "# os.environ['main_folder'] = str(cwd)\n", + "# os.environ['director_workspace_path'] = str(director_workspace_path)\n", + "# os.environ['director_logfile'] = str(director_logfile)\n", + "# os.environ['director_config_file'] = str(director_config_file)" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "72a9268a-ee1e-4dda-a4c4-cfb29428f45e", + "metadata": {}, + "outputs": [], + "source": [ + "# %%script /bin/bash --bg\n", + "# cd $director_workspace_path\n", + "# fx director start --disable-tls -c $director_config_file > $director_logfile &\n", + "# cd $main_folder" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "f446a04d", + "metadata": {}, + "outputs": [], + "source": [ + "def start_director():\n", + " cwd = Path.cwd()\n", + " director_workspace_path = Path('../director/').absolute()\n", + " os.chdir(director_workspace_path)\n", + " director_config_file = director_workspace_path / 'director_config.yaml'\n", + " director_logfile = director_workspace_path / 'director.log'\n", + " director_logfile.unlink(missing_ok=True)\n", + " os.system('fx director start --disable-tls '\n", + " f'-c {director_config_file} > {director_logfile} &')\n", + " os.chdir(cwd)" + ] + }, + { + "cell_type": "markdown", + "id": "e0a634ea-9c62-4048-bb91-099fe9097b55", + "metadata": {}, + "source": [ + "## Start Envoys" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "13470bfd-d67e-48dc-b1ff-10c7ff526c0c", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# envoy_workspace_path = Path('../envoy/').absolute()\n", + "\"\"\"Make sure the Director port matches one from the Director config file\"\"\"\n", + "def start_envoys(config_paths: List[Path]) -> None:\n", + " envoy_workspace_path = config_paths[0].parent\n", + " cwd = Path.cwd()\n", + " os.chdir(envoy_workspace_path)\n", + " for i, path in enumerate(config_paths):\n", + " os.system(f'fx envoy start -n env_{i + 1} --disable-tls '\n", + " f'--envoy-config-path {path} -dh localhost -dp 50050 '\n", + " f'>env_{i + 1}.log &')\n", + " os.chdir(cwd)\n", + " \n", + "# start_envoys(config_paths)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "2fc8a569-6978-4c80-88d1-741799407239", + "metadata": {}, + "outputs": [], + "source": [ + "def start_federation(envoy_config_path='../envoy/envoy_config.yaml',\n", + " n_cols_total=3, n_cols_corrupt=0, deacticate_cols_indeces=(),\n", + " assign_gpus=True):\n", + " # Read the original envoy config file content\n", + " with open(Path(envoy_config_path), \"r\") as stream:\n", + " orig_config = yaml.safe_load(stream)\n", + " # Write new configs\n", + " config_paths = generate_envoy_configs(orig_config, n_cols_total=n_cols_total,\n", + " n_cols_corrupt=n_cols_corrupt, assign_gpus=assign_gpus)\n", + " for idx in deacticate_cols_indeces:\n", + " del config_paths[idx]\n", + " # Start Director and Envoys \n", + " start_director()\n", + " time.sleep(2)\n", + " start_envoys(config_paths)\n", + " time.sleep(2)\n", + "# remove_configs(config_paths)" + ] + }, + { + "cell_type": "markdown", + "id": "b6a56e31", + "metadata": {}, + "source": [ + "## Run experiments" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "38b1b422", + "metadata": {}, + "outputs": [], + "source": [ + "# from openfl.component.aggregation_functions import AggregationFunction\n", + "# import numpy as np\n", + "# class One_Good_Envoy(AggregationFunction):\n", + "# def __init__(self, col_name='env_3', weight_scale: float = 0.5):\n", + "# self.good_col = col_name\n", + "# self.weight_scale = weight_scale\n", + "\n", + "# def call(self, local_tensors, *_) -> np.ndarray:\n", + "# weights = [x.weight if self.good_col == x.col_name else x.weight * self.weight_scale\n", + "# for x in local_tensors]\n", + "# tensors = np.array([x.tensor for x in local_tensors])\n", + "# return np.average(tensors, weights=weights, axis=0)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "03129144", + "metadata": {}, + "outputs": [], + "source": [ + "from testbook import testbook\n", + "\n", + "\n", + "def start_experiment_from_ipynb(experiment_name, notebook_path='./PyTorch_Kvasir_UNet.ipynb',\n", + " n_rounds=40, train_batch_size=6, lr=1e-4, weight_decay=0,\n", + " opt_treatment='CONTINUE_GLOBAL'):\n", + " command = (\n", + " 'torch.manual_seed(0) \\n'\n", + " 'model_unet = UNet() \\n'\n", + " f'optimizer_adam = optim.Adam(model_unet.parameters(), lr={lr}, weight_decay={weight_decay}) \\n'\n", + " 'MI = ModelInterface(model=model_unet, optimizer=optimizer_adam, framework_plugin=framework_adapter) \\n'\n", + " f'fed_dataset = KvasirSD(train_bs={train_batch_size}, valid_bs=8) \\n'\n", + " f'fl_experiment = FLExperiment(federation=federation, experiment_name=\"{experiment_name}\") \\n'\n", + " 'fl_experiment.start(model_provider=MI, task_keeper=TI, data_loader=fed_dataset, \\n'\n", + " f'rounds_to_train={n_rounds}, \\n'\n", + " f\"opt_treatment='{opt_treatment}', \\n\"\n", + " \"device_assignment_policy='CUDA_PREFERRED') \\n\"\n", + " 'fl_experiment.stream_metrics()'\n", + " )\n", + " # We may execute only some cells, i.e. range(0,23)\n", + " # We should not execute `start_experiment`\n", + " with testbook(notebook_path, execute=True, timeout=None) as tb:\n", + " tb.inject(command, pop=True)" + ] + }, + { + "cell_type": "markdown", + "id": "91485b1f", + "metadata": {}, + "source": [ + "## Stop the Federation and Clean up" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "a5fdc3af-63b5-41b5-b9d6-be2aac8626e0", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "256" + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# To stop all services run\n", + "# !pkill fx\n", + "os.system('pkill fx')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4e69ae57-bfa3-4047-af7f-3e1cf24ac35e", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "id": "b2598c95", + "metadata": {}, + "source": [ + "# Run all together " + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "2d1a8c11", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "File ‘kvasir_data/kvasir.zip’ already there; not retrieving.\n", + "WARNING: You are using pip version 22.0.3; however, version 22.0.4 is available.\n", + "You should consider upgrading via the '/home/idavidyu/.virtualenvs/corrupt-envoy/bin/python -m pip install --upgrade pip' command.\n", + "/home/idavidyu/openfl/openfl/plugins/frameworks_adapters/pytorch_adapter.py:47: UserWarning: The given NumPy array is not writeable, and PyTorch does not support non-writeable tensors. This means you can write to the underlying (supposedly non-writeable) NumPy array using the tensor. You may want to copy the array to protect its data or make it writeable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:141.)\n", + " new_state[k] = pt.from_numpy(tensor_dict.pop(k)).to(device)\n", + "train: 0%| | 0/49 [00:00 None: def get_dataset(self, dataset_type: str) -> np.ndarray: """ - Return a shard dataset by type. + Get a shard dataset. A simple list with elements (x, y) implemets the Shard Dataset interface. """ diff --git a/openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/LinReg.ipynb b/openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/LinReg.ipynb new file mode 100644 index 0000000000..53670ee814 --- /dev/null +++ b/openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/LinReg.ipynb @@ -0,0 +1,515 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "689ee822", + "metadata": {}, + "outputs": [], + "source": [ + "!pip install -r requirements.txt" + ] + }, + { + "cell_type": "markdown", + "id": "d63e64c6-9955-4afc-8d04-d8c85bb28edc", + "metadata": {}, + "source": [ + "# Linear Regression with Numpy and OpenFL" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6c9eee14-22a1-4d48-a7da-e68d01037cd4", + "metadata": {}, + "outputs": [], + "source": [ + "from typing import List, Union\n", + "import numpy as np\n", + "import random\n", + "import matplotlib.pyplot as plt\n", + "%matplotlib inline\n", + "from matplotlib.pylab import rcParams\n", + "rcParams['figure.figsize'] = 7, 5" + ] + }, + { + "cell_type": "markdown", + "id": "c4b334ef-6a72-4b82-b978-1401973d0512", + "metadata": { + "tags": [] + }, + "source": [ + "# We will use MSE as loss function and Ridge weights regularization\n", + "![image.png](https://www.analyticsvidhya.com/wp-content/uploads/2016/01/eq5-1.png)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f4cc8ec2-b818-4db8-8700-39c1a12917df", + "metadata": {}, + "outputs": [], + "source": [ + "class LinRegLasso:\n", + " def __init__(self, n_feat: int) -> None:\n", + " self.weights = np.ones((n_feat + 1)) # (n_feat + 1,) weights + bias\n", + " \n", + " def predict(self, feature_vector: Union[np.ndarray, List[int]]) -> float:\n", + " '''\n", + " feature_vector may be a list or have shape (n_feat,)\n", + " or it may be a bunch of vectors (n_vec, nfeat)\n", + " '''\n", + " feature_vector = np.array(feature_vector)\n", + " if len(feature_vector.shape) == 1:\n", + " feature_vector = feature_vector[:,np.newaxis]\n", + " assert feature_vector.shape[-1] == self.weights.shape[0] - 1, \\\n", + " f\"sample shape is {feature_vector.shape} and weights shape is f{self.weights}\"\n", + " \n", + " return self.weights @ np.concatenate((feature_vector.T, [[1]*feature_vector.shape[0]]))\n", + " \n", + " def mse(self, X: np.ndarray, Y: np.ndarray) -> float:\n", + " Y_hat = self.predict(X)\n", + " return np.sum((Y - Y_hat)**2) / Y.shape[0]\n", + "\n", + " def _update_weights(self, X: np.ndarray, Y: np.ndarray, lr: float, wd: float) -> None:\n", + " '''\n", + " X: (n_samples, n_features)\n", + " Y: (n_samples,)\n", + " self.weights: (n_features + 1)\n", + " \n", + " Cost function is MSE: (y - W*X - b)**2;\n", + " its derivative with resp to any x is -2*X*(y - W*X - b),\n", + " and with resp to b is -2*(y - W*X - b).\n", + " \n", + " Regularisation function is L1 |W|;\n", + " its derivative is SIGN(w)\n", + " '''\n", + " predictions = self.predict(X)\n", + " error = Y - predictions # (n_samples,)\n", + " X_with_bias = np.concatenate((X.T, [[1]*X.shape[0]])).T\n", + " updates = -2 * X_with_bias.T @ error / Y.shape[0]\n", + " regression_term = np.sign(self.weights)\n", + " \n", + " self.weights = self.weights - lr * updates + wd * regression_term\n", + " \n", + " def fit(self, X: np.ndarray, Y: np.ndarray,\n", + " n_epochs: int, lr: float, wd: float,\n", + " silent: bool=False) -> None:\n", + " for i in range(n_epochs):\n", + " self._update_weights(X, Y, lr, wd)\n", + " mse = self.mse(X, Y)\n", + " if not silent:\n", + " print(f'epoch: {i}, \\t MSE: {mse}')\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "af89e7e5-6cfc-46bc-acd2-7d5bfb373091", + "metadata": {}, + "outputs": [], + "source": [ + "# Define input array with angles from 60deg to 300deg converted to radians\n", + "x = np.array([i*np.pi/180 for i in range(60,300,4)])\n", + "np.random.seed(10) # Setting seed for reproducibility\n", + "y = np.sin(x) + np.random.normal(0,0.15,len(x))\n", + "# plt.plot(x,y,'.')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ffefca2b-d7f6-4111-8872-c017c182a2de", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "lr_model = LinRegLasso(1)\n", + "wd = 0.0001\n", + "lr = 0.08\n", + "epochs = 100\n", + "\n", + "print(f\"Initila MSE: {lr_model.mse(x,y)}\")\n", + "lr_model.fit(x[:,np.newaxis],y, epochs, lr, wd, silent=True)\n", + "print(f\"Final MSE: {lr_model.mse(x,y)}\")\n", + "print(f\"Final parameters: {lr_model.weights}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "410f2d80-989a-43ab-958f-7b68fd8f2e90", + "metadata": {}, + "outputs": [], + "source": [ + "# We can also solve this 1D problem using Numpy\n", + "numpy_solution = np.polyfit(x,y,1)\n", + "predictor_np = np.poly1d(numpy_solution)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6cb323db-9f3a-42af-94da-4b170adef867", + "metadata": {}, + "outputs": [], + "source": [ + "y_hat = lr_model.predict(x)\n", + "y_np = predictor_np(x)\n", + "plt.plot(x,y,'.')\n", + "plt.plot(x,y_hat,'.')\n", + "plt.plot(x,y_np,'--')" + ] + }, + { + "cell_type": "markdown", + "id": "ffd4d2d7-5537-496a-88c1-301da87d979c", + "metadata": {}, + "source": [ + "# Now we run the same training on federated data" + ] + }, + { + "cell_type": "markdown", + "id": "09cf7090-da51-4f4e-9d28-2a5c6e3bca02", + "metadata": {}, + "source": [ + "## Connect to a Federation" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1b3c0039-e1f7-4047-b98b-a2d4bd42f015", + "metadata": {}, + "outputs": [], + "source": [ + "# Create a federation\n", + "from openfl.interface.interactive_api.federation import Federation\n", + "\n", + "# please use the same identificator that was used in signed certificate\n", + "client_id = 'frontend'\n", + "director_node_fqdn = 'localhost'\n", + "director_port = 50049\n", + "\n", + "federation = Federation(\n", + " client_id=client_id,\n", + " director_node_fqdn=director_node_fqdn,\n", + " director_port=director_port,\n", + " tls=False\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7815120e-b704-4a7d-a65a-3c7542023ead", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "shard_registry = federation.get_shard_registry()\n", + "shard_registry" + ] + }, + { + "cell_type": "markdown", + "id": "b011dd95-64a7-4a8b-91ec-e61cdf885bbb", + "metadata": {}, + "source": [ + "### Data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b1985ac9-a2b1-4561-a962-6adfe35c3b97", + "metadata": {}, + "outputs": [], + "source": [ + "from openfl.interface.interactive_api.experiment import TaskInterface, DataInterface, ModelInterface, FLExperiment\n", + "\n", + "class LinRegDataSet(DataInterface):\n", + " def __init__(self, **kwargs):\n", + " \"\"\"Initialize DataLoader.\"\"\"\n", + " self.kwargs = kwargs\n", + " pass\n", + "\n", + " @property\n", + " def shard_descriptor(self):\n", + " \"\"\"Return shard descriptor.\"\"\"\n", + " return self._shard_descriptor\n", + " \n", + " @shard_descriptor.setter\n", + " def shard_descriptor(self, shard_descriptor):\n", + " \"\"\"\n", + " Describe per-collaborator procedures or sharding.\n", + "\n", + " This method will be called during a collaborator initialization.\n", + " Local shard_descriptor will be set by Envoy.\n", + " \"\"\"\n", + " self._shard_descriptor = shard_descriptor\n", + " self.train_set = shard_descriptor.get_dataset(\"train\")\n", + " self.val_set = shard_descriptor.get_dataset(\"val\")\n", + "\n", + " def get_train_loader(self, **kwargs):\n", + " \"\"\"Output of this method will be provided to tasks with optimizer in contract.\"\"\"\n", + " return self.train_set\n", + "\n", + " def get_valid_loader(self, **kwargs):\n", + " \"\"\"Output of this method will be provided to tasks without optimizer in contract.\"\"\"\n", + " return self.val_set\n", + "\n", + " def get_train_data_size(self):\n", + " \"\"\"Information for aggregation.\"\"\"\n", + " return len(self.train_set)\n", + "\n", + " def get_valid_data_size(self):\n", + " \"\"\"Information for aggregation.\"\"\"\n", + " return len(self.val_set)\n", + " \n", + "lin_reg_dataset = LinRegDataSet()" + ] + }, + { + "cell_type": "markdown", + "id": "b8909127-99d1-4dba-86fe-01a1b86585e7", + "metadata": {}, + "source": [ + "### Model" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9523c9a2-a259-461f-937f-1fb054bd2886", + "metadata": {}, + "outputs": [], + "source": [ + "framework_adapter = 'custom_adapter.CustomFrameworkAdapter'\n", + "fed_model = LinRegLasso(1)\n", + "MI = ModelInterface(model=fed_model, optimizer=None, framework_plugin=framework_adapter)\n", + "\n", + "# Save the initial model state\n", + "initial_model = LinRegLasso(1)" + ] + }, + { + "cell_type": "markdown", + "id": "2e3558bb-b21b-48ac-b07e-43cf75e6907b", + "metadata": {}, + "source": [ + "### Tasks\n", + "We need to employ a trick reporting metrics. OpenFL decides which model is the best based on an *increasing* metric." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f73e1ff9-d54a-49b5-9ce8-8bc72c6a2c6f", + "metadata": {}, + "outputs": [], + "source": [ + "TI = TaskInterface()\n", + "\n", + "@TI.add_kwargs(**{'lr': 0.001,\n", + " 'wd': 0.0001,\n", + " 'epoches': 1})\n", + "@TI.register_fl_task(model='my_model', data_loader='train_dataset', \\\n", + " device='device', optimizer='optimizer') \n", + "def train(my_model, train_dataset, optimizer, device, lr, wd, epoches):\n", + " X, Y = [], []\n", + " for x, y in train_dataset:\n", + " X.append(x)\n", + " Y.append(y)\n", + " X, Y = np.array(X)[:, np.newaxis], np.array(Y)\n", + " my_model.fit(X, Y, epochs, lr, wd, silent=True)\n", + " return {'train_MSE': my_model.mse(X, Y),}\n", + "\n", + "@TI.register_fl_task(model='my_model', data_loader='val_dataset', device='device') \n", + "def validate(my_model, val_dataset, device):\n", + " X, Y = [], []\n", + " for x, y in val_dataset:\n", + " X.append(x)\n", + " Y.append(y) \n", + " X, Y = np.array(X)[:, np.newaxis], np.array(Y)\n", + " return {'validation_MSE': my_model.mse(X, Y),}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c4d8004a", + "metadata": {}, + "outputs": [], + "source": [ + "a = np.array([1,2,3])\n", + "a[:, np.newaxis]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "01a935b9", + "metadata": {}, + "outputs": [], + "source": [ + "a = np.arange(10).reshape(-1,2)\n", + "a[:,:-1]" + ] + }, + { + "cell_type": "markdown", + "id": "ee7659cc-6e03-43f5-9078-95707fa0e4d5", + "metadata": { + "tags": [] + }, + "source": [ + "### Run" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "749100e8-05ce-418c-a980-545e3beb900b", + "metadata": {}, + "outputs": [], + "source": [ + "experiment_name = 'linear_regression_experiment_uncorrupted'\n", + "fl_experiment = FLExperiment(federation=federation, experiment_name=experiment_name,\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "16bf1df7-8ca8-4a5e-a833-47c265c11e05", + "metadata": {}, + "outputs": [], + "source": [ + "fl_experiment.start(model_provider=MI, \n", + " task_keeper=TI,\n", + " data_loader=lin_reg_dataset,\n", + " rounds_to_train=10,)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1178d1ea-05e6-46be-ac07-21620bd6ec76", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "fl_experiment.stream_metrics()" + ] + }, + { + "cell_type": "markdown", + "id": "af331ccd-66b4-4925-8627-52cf03ceea5e", + "metadata": { + "tags": [] + }, + "source": [ + "### Optional: start tensorboard" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fde4ed4d-dda5-4bab-8dd3-e1ac44f5acf9", + "metadata": {}, + "outputs": [], + "source": [ + "%%script /bin/bash --bg\n", + "tensorboard --host $(hostname --all-fqdns | awk '{print $1}') --logdir logs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7aa78602-b66a-4378-bea9-e915f2a1fdd8", + "metadata": {}, + "outputs": [], + "source": [ + "last_model = fl_experiment.get_last_model()\n", + "best_model = fl_experiment.get_best_model()\n", + "print(best_model.weights)\n", + "print(last_model.weights)\n", + "print(f\"last model MSE: {last_model.mse(x,y)}\")\n", + "print(f\"best model MSE: {best_model.mse(x,y)}\")" + ] + }, + { + "cell_type": "markdown", + "id": "ae66d688", + "metadata": {}, + "source": [ + "### Evaluate results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "573417e0", + "metadata": {}, + "outputs": [], + "source": [ + "n_cols = 20\n", + "n_samples = 4\n", + "interval = 240\n", + "x_start = 60\n", + "noise = 0.3\n", + "\n", + "X = None\n", + "\n", + "for rank in range(n_cols):\n", + " np.random.seed(rank) # Setting seed for reproducibility\n", + " x = np.random.rand(n_samples, 1) * interval + x_start\n", + " x *= np.pi / 180\n", + " X = x if X is None else np.vstack((X,x))\n", + " y = np.sin(x) + np.random.normal(0, noise, size=(n_samples, 1))\n", + " plt.plot(x,y,'+')\n", + " \n", + "X.sort() \n", + "Y_hat = last_model.predict(X)\n", + "plt.plot(X,Y_hat,'--')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "84e927c8", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/custom_adapter.py b/openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/custom_adapter.py new file mode 100644 index 0000000000..e7bb5b372a --- /dev/null +++ b/openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/custom_adapter.py @@ -0,0 +1,21 @@ +# Copyright (C) 2020-2021 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""Custom model numpy adapter.""" + +from openfl.plugins.frameworks_adapters.framework_adapter_interface import ( + FrameworkAdapterPluginInterface, +) + + +class CustomFrameworkAdapter(FrameworkAdapterPluginInterface): + """Framework adapter plugin class.""" + + @staticmethod + def get_tensor_dict(model, optimizer=None): + """Extract tensors from a model.""" + return {'w': model.weights} + + @staticmethod + def set_tensor_dict(model, tensor_dict, optimizer=None, device='cpu'): + """Load tensors to a model.""" + model.weights = tensor_dict['w'] diff --git a/openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/requirements.txt b/openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/requirements.txt new file mode 100644 index 0000000000..452b8739fb --- /dev/null +++ b/openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/requirements.txt @@ -0,0 +1,4 @@ +openfl==1.2.1 +numpy +jupyterlab +matplotlib \ No newline at end of file diff --git a/openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/start_federation.ipynb b/openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/start_federation.ipynb new file mode 100644 index 0000000000..d234169cd9 --- /dev/null +++ b/openfl-tutorials/interactive_api/numpy_linear_regression/workspace_corrupt_envoy/start_federation.ipynb @@ -0,0 +1,232 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "f813b6ae-b082-49bb-b64f-fd619b6de14a", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from pathlib import Path\n", + "import yaml\n", + "from typing import Dict, List, Union" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1ee62ab-09e4-4f4c-984f-bdb6909d6106", + "metadata": {}, + "outputs": [], + "source": [ + "# Read the original envoy config file content\n", + "with open(Path('../envoy/envoy_config.yaml'), \"r\") as stream:\n", + " orig_config = yaml.safe_load(stream)\n", + "\n", + "def generate_envoy_configs(config: Dict,\n", + " save_path: Union[str, Path] = '../envoy/',\n", + " n_cols: int = 10,\n", + " n_samples_per_col: int = 10,\n", + " noise: float = 0.15) -> List[Path]:\n", + " \n", + " config['shard_descriptor']['params']['n_samples'] = n_samples_per_col\n", + " config['shard_descriptor']['params']['noise'] = noise\n", + " \n", + " config_paths = [(Path(save_path) / f'{i}_envoy_config.yaml').absolute()\n", + " for i in range(1, n_cols + 1)]\n", + "\n", + " for i, path in enumerate(config_paths):\n", + " if i == 0:\n", + " config['shard_descriptor']['params']['corrupt'] = True\n", + " else:\n", + " config['shard_descriptor']['params']['corrupt'] = False\n", + " config['shard_descriptor']['params']['rank'] = i\n", + " with open(path, \"w\") as stream:\n", + " yaml.safe_dump(config, stream)\n", + " \n", + " return config_paths\n", + " \n", + "def remove_configs(config_paths):\n", + " for path in config_paths:\n", + " path.unlink()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0d058340-22d4-4630-b8e3-9c3fc29198ab", + "metadata": {}, + "outputs": [], + "source": [ + "config_paths = generate_envoy_configs(orig_config, n_cols=5, n_samples_per_col=20, noise=0.2)\n", + "# remove_configs(config_paths)" + ] + }, + { + "cell_type": "markdown", + "id": "ec065be9-c2c6-4a81-9a2a-ea54794e52ba", + "metadata": {}, + "source": [ + "## Start the Director service" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "60bcaa49-aabb-42ec-a279-9e32b31ce6ca", + "metadata": {}, + "outputs": [], + "source": [ + "cwd = Path.cwd()\n", + "director_workspace_path = Path('../director/').absolute()\n", + "director_config_file = director_workspace_path / 'director_config.yaml'\n", + "director_logfile = director_workspace_path / 'director.log'\n", + "director_logfile.unlink(missing_ok=True)\n", + "# \n", + "\n", + "os.environ['main_folder'] = str(cwd)\n", + "os.environ['director_workspace_path'] = str(director_workspace_path)\n", + "os.environ['director_logfile'] = str(director_logfile)\n", + "os.environ['director_config_file'] = str(director_config_file)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "72a9268a-ee1e-4dda-a4c4-cfb29428f45e", + "metadata": {}, + "outputs": [], + "source": [ + "%%script /bin/bash --bg\n", + "cd $director_workspace_path\n", + "fx director start --disable-tls -c $director_config_file > $director_logfile &\n", + "cd $main_folder" + ] + }, + { + "cell_type": "markdown", + "id": "e0a634ea-9c62-4048-bb91-099fe9097b55", + "metadata": {}, + "source": [ + "## Start Envoys" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "13470bfd-d67e-48dc-b1ff-10c7ff526c0c", + "metadata": { + "scrolled": false + }, + "outputs": [], + "source": [ + "# envoy_workspace_path = Path('../envoy/').absolute()\n", + "def start_envoys(config_paths: List[Path]) -> None:\n", + " envoy_workspace_path = config_paths[0].parent\n", + " cwd = Path.cwd()\n", + " os.chdir(envoy_workspace_path)\n", + " for i, path in enumerate(config_paths):\n", + " os.system(f'fx envoy start -n env_{i + 1} --disable-tls '\n", + " f'--envoy-config-path {path} -dh localhost -dp 50049 '\n", + " f'>env_{i + 1}.log &')\n", + " os.chdir(cwd)\n", + " \n", + "start_envoys(config_paths)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2fc8a569-6978-4c80-88d1-741799407239", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a5fdc3af-63b5-41b5-b9d6-be2aac8626e0", + "metadata": {}, + "outputs": [], + "source": [ + "# To stop all services run\n", + "!pkill fx" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4e69ae57-bfa3-4047-af7f-3e1cf24ac35e", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "remove_configs(config_paths)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "46095127-f116-4ae3-a3b4-6be24064b49f", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2ae08040", + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "sys.path.insert(0, '..')\n", + "from envoy.linreg_shard_descriptor import LinRegSD\n", + "sd = LinRegSD(rank=1, corrupt=1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "59d00b8e", + "metadata": {}, + "outputs": [], + "source": [ + "ds = sd.get_dataset(dataset_type='val')\n", + "x, y = ds[0]\n", + "print(x, y)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e4abccde", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/openfl/interface/interactive_api/experiment.py b/openfl/interface/interactive_api/experiment.py index 73f02f8ef8..a63964f3d5 100644 --- a/openfl/interface/interactive_api/experiment.py +++ b/openfl/interface/interactive_api/experiment.py @@ -182,12 +182,13 @@ def start(self, *, model_provider, task_keeper, data_loader, delta_updates - [bool] Tells if collaborators should send delta updates for the locally tuned models. If set to False, whole checkpoints will be sent. opt_treatment - Optimizer state treatment policy. - Valid options: 'RESET' - reinitialize optimizer for every round, + Valid options: + 'RESET' - reinitialize optimizer for every round, 'CONTINUE_LOCAL' - keep local optimizer state, 'CONTINUE_GLOBAL' - aggregate optimizer state. device_assignment_policy - device assignment policy. - Valid options: 'CPU_ONLY' - device parameter passed to tasks - will always be 'cpu', + Valid options: + 'CPU_ONLY' - device parameter passed to tasks will always be 'cpu', 'CUDA_PREFERRED' - enable passing CUDA device identifiers to tasks by collaborators, works with cuda-device-monitor plugin equipped Envoys. pip_install_options - tuple of options for the remote `pip install` calls, diff --git a/openfl/utilities/dataset_spoilers/__init__.py b/openfl/utilities/dataset_spoilers/__init__.py new file mode 100644 index 0000000000..31f6001b85 --- /dev/null +++ b/openfl/utilities/dataset_spoilers/__init__.py @@ -0,0 +1,14 @@ +# Copyright (C) 2020-2022 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""Dataset spoilers package.""" + +from .shard_corruptor import corrupt_shard +from .dataset_spoil_methods import spoil_targets_random_choice +from .dataset_spoil_methods import spoil_targets_rotation + + +__all__ = [ + 'corrupt_shard', + 'spoil_targets_random_choice', + 'spoil_targets_rotation', +] diff --git a/openfl/utilities/dataset_spoilers/dataset_spoil_methods.py b/openfl/utilities/dataset_spoilers/dataset_spoil_methods.py new file mode 100644 index 0000000000..a3f90701c1 --- /dev/null +++ b/openfl/utilities/dataset_spoilers/dataset_spoil_methods.py @@ -0,0 +1,34 @@ +# Copyright (C) 2020-2022 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""Dataset spoilers module.""" + +import random +from openfl.interface.interactive_api.shard_descriptor import ShardDataset + + +def spoil_targets_rotation(shard_dataset: ShardDataset) -> ShardDataset: + """Spoiler method that shifts targets index by 1.""" + class SpoiledDataset(ShardDataset): + def __getitem__(self, index): + sample, _ = shard_dataset[index] + _, target = shard_dataset[(index + 1) % len(self)] + return sample, target + + def __len__(self): + return len(shard_dataset) + + return SpoiledDataset() + + +def spoil_targets_random_choice(shard_dataset: ShardDataset) -> ShardDataset: + """Spoiler method that takes a random target from the Dataset.""" + class SpoiledDataset(ShardDataset): + def __getitem__(self, index): + sample, _ = shard_dataset[index] + _, target = random.choice(shard_dataset) + return sample, target + + def __len__(self): + return len(shard_dataset) + + return SpoiledDataset() diff --git a/openfl/utilities/dataset_spoilers/shard_corruptor.py b/openfl/utilities/dataset_spoilers/shard_corruptor.py new file mode 100644 index 0000000000..e7202395ce --- /dev/null +++ b/openfl/utilities/dataset_spoilers/shard_corruptor.py @@ -0,0 +1,37 @@ +# Copyright (C) 2020-2022 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""Shard corruptor module.""" + +from typing import Callable +from openfl.interface.interactive_api.shard_descriptor import ShardDataset +from openfl.interface.interactive_api.shard_descriptor import ShardDescriptor + + +def corrupt_shard(spoil_method: Callable[[ShardDataset], ShardDataset]) -> Callable: + """ + Corrupting Shard Descriptor wrapper. + + Decorating a Shard Descriptor (SD) class with this function will result in the following: + 1. `corrupt` parameter is added to the SD init function. + The Envoy manager may enable corruption by putting `corrupt: true` to SD params. + 2. `spoil_method` passed to the wrapper will be used to spoil Shard Datasets of this Envoy + if corruption is enabled. + """ + def decorator_func(ShardDescriptorClass: ShardDescriptor) -> ShardDescriptor: + # This decorator is aware of the chosen `spoil_method` + class WrapperClass(ShardDescriptorClass): + """Extended SD class that is able to incorporate corruption.""" + def __init__(self, corrupt: bool = False, **kwargs): + self.corrupt = corrupt + super().__init__(**kwargs) + + def get_dataset(self, *args, **kwargs): + original_shard_dataset = super().get_dataset(*args, **kwargs) + if not self.corrupt: + return original_shard_dataset + + return spoil_method(original_shard_dataset) + + return WrapperClass + + return decorator_func