From 8a6cd51e8d67b7a6c11e020b0c44ca841a99986e Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Wed, 24 Feb 2016 00:56:54 +0800 Subject: [PATCH 01/18] Modified some example conf files for consistent benchmarking. --- examples/cifar10/cudnn_hybrid.conf | 6 +- examples/cifar10/hybrid.conf | 4 +- examples/cifar10/job.conf | 3 + examples/mnist/hybrid.conf | 256 +++++++++++++++++++++++++++++ 4 files changed, 264 insertions(+), 5 deletions(-) create mode 100644 examples/mnist/hybrid.conf diff --git a/examples/cifar10/cudnn_hybrid.conf b/examples/cifar10/cudnn_hybrid.conf index a11145c170..3a60a96697 100644 --- a/examples/cifar10/cudnn_hybrid.conf +++ b/examples/cifar10/cudnn_hybrid.conf @@ -1,10 +1,10 @@ name: "cifar10-convnet" -train_steps: 10000 +train_steps: 1000 test_steps: 0 test_freq: 200 #validate_steps: 100 #validate_freq: 300 -disp_freq: 200 +disp_freq: 50 gpu: 0 gpu: 1 #debug: true @@ -43,7 +43,6 @@ neuralnet { shape: 32 } include: kTrain - partition_dim: 0 } # layer{ # name: "data" @@ -73,7 +72,6 @@ neuralnet { shape: 32 } include: kTest - partition_dim: 0 } layer { diff --git a/examples/cifar10/hybrid.conf b/examples/cifar10/hybrid.conf index ec3da0cda7..19b1dc54b0 100644 --- a/examples/cifar10/hybrid.conf +++ b/examples/cifar10/hybrid.conf @@ -4,7 +4,9 @@ test_steps: 0 test_freq: 200 #validate_steps: 100 #validate_freq: 300 -disp_freq: 30 +disp_freq: 50 + + #debug: true #checkpoint_path: "examples/cifar10/checkpoint/step1000-worker0" train_one_batch { diff --git a/examples/cifar10/job.conf b/examples/cifar10/job.conf index d20b452e34..433a5f7cc7 100644 --- a/examples/cifar10/job.conf +++ b/examples/cifar10/job.conf @@ -5,6 +5,9 @@ test_freq: 200 #validate_steps: 100 #validate_freq: 300 disp_freq: 50 + + + #checkpoint_path: "examples/cifar10/checkpoint/step1000-worker0" train_one_batch { alg: kBP diff --git a/examples/mnist/hybrid.conf b/examples/mnist/hybrid.conf new file mode 100644 index 0000000000..a0706f7107 --- /dev/null +++ b/examples/mnist/hybrid.conf @@ -0,0 +1,256 @@ +name: "mlp" +train_steps: 1000 +test_steps:0 +test_freq:60 +disp_freq:10 + +train_one_batch { + alg: kBP +} + +updater{ + type: kSGD + learning_rate{ + type : kStep + base_lr: 0.001 + step_conf{ + change_freq: 60 + gamma: 0.997 + } + } +} + +neuralnet { + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/train_data.bin" + random_skip: 5000 + batchsize: 64 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + } + include: kTrain + } + + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/test_data.bin" + batchsize: 100 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + } + include: kTest + } + + layer{ + name: "fc1" + partition_dim: 0 + type: kInnerProduct + srclayers:"data" + innerproduct_conf{ + num_output: 2500 + } + param{ + name: "w1" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b1" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh1" + partition_dim: 0 + type: kSTanh + srclayers:"fc1" + } + layer{ + name: "fc2" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh1" + innerproduct_conf{ + num_output: 2000 + } + param{ + name: "w2" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b2" + init { + type: kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh2" + partition_dim: 0 + type: kSTanh + srclayers:"fc2" + } + layer{ + name: "fc3" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh2" + innerproduct_conf{ + num_output: 1500 + } + param{ + name: "w3" + init{ + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b3" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh3" + partition_dim: 0 + type: kSTanh + srclayers:"fc3" + } + layer{ + name: "fc4" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh3" + innerproduct_conf{ + num_output: 1000 + } + param{ + name: "w4" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b4" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh4" + partition_dim: 0 + type: kSTanh + srclayers:"fc4" + } + layer{ + name: "fc5" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh4" + innerproduct_conf{ + num_output: 500 + } + param{ + name: "w5" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b5" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh5" + partition_dim: 0 + type: kSTanh + srclayers:"fc5" + } + layer{ + name: "fc6" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh5" + innerproduct_conf{ + num_output: 10 + } + param{ + name: "w6" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b6" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + layer{ + name: "loss" + type:kSoftmaxLoss + softmaxloss_conf{ + topk:1 + } + srclayers:"fc6" + srclayers:"data" + } +} +cluster { + nworker_groups: 1 + nserver_groups: 1 + nworkers_per_group: 2 + nworkers_per_procs: 2 + workspace: "examples/mnist" +} From 2204dd761ba3e502a468a29d9032b22d60939714 Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Wed, 24 Feb 2016 01:17:38 +0800 Subject: [PATCH 02/18] LayerProto in job.proto specifies that extensions should use 1001 to 1100. --- examples/rnnlm/rnnlm.proto | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/rnnlm/rnnlm.proto b/examples/rnnlm/rnnlm.proto index 8cfec8682b..4a4dcbc17d 100644 --- a/examples/rnnlm/rnnlm.proto +++ b/examples/rnnlm/rnnlm.proto @@ -39,9 +39,9 @@ message DataProto { } extend singa.LayerProto { - optional EmbeddingProto embedding_conf = 101; - optional LossProto loss_conf = 102; - optional DataProto data_conf = 103; + optional EmbeddingProto embedding_conf = 1001; + optional LossProto loss_conf = 1002; + optional DataProto data_conf = 1003; } message WordRecord { From bcbcffb96cf225af99f258e0ceb4afd545b154ad Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Wed, 24 Feb 2016 14:59:05 +0800 Subject: [PATCH 03/18] Created more sample conf files --- examples/cifar10/gpu.conf | 282 +++++++++++++++++++++++++++++++ examples/mnist/gpu.conf | 243 ++++++++++++++++++++++++++ examples/mnist/hybrid.conf | 4 +- examples/mnist/hybrid_gpu.conf | 257 ++++++++++++++++++++++++++++ examples/mnist/hybrid_gpux2.conf | 258 ++++++++++++++++++++++++++++ examples/mnist/job.conf | 6 +- src/driver.cc | 1 + src/worker.cc | 1 + 8 files changed, 1048 insertions(+), 4 deletions(-) create mode 100644 examples/cifar10/gpu.conf create mode 100644 examples/mnist/gpu.conf create mode 100644 examples/mnist/hybrid_gpu.conf create mode 100644 examples/mnist/hybrid_gpux2.conf diff --git a/examples/cifar10/gpu.conf b/examples/cifar10/gpu.conf new file mode 100644 index 0000000000..6dbf72d6cb --- /dev/null +++ b/examples/cifar10/gpu.conf @@ -0,0 +1,282 @@ +name: "cifar10-convnet" +train_steps: 1000 +test_steps: 100 +test_freq: 200 +#validate_steps: 100 +#validate_freq: 300 +disp_freq: 50 +gpu:0 + + +#checkpoint_path: "examples/cifar10/checkpoint/step1000-worker0" +train_one_batch { + alg: kBP +} +updater{ + type: kSGD + weight_decay:0.004 + momentum:0.9 + learning_rate { + type: kFixedStep + fixedstep_conf:{ + step:0 + step:60000 + step:65000 + step_lr:0.001 + step_lr:0.0001 + step_lr:0.00001 + } + } +} +neuralnet { + layer{ + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/cifar10/train_data.bin" + mean_file: "examples/cifar10/image_mean.bin" + batchsize: 100 + #random_skip: 5000 + shape: 3 + shape: 32 + shape: 32 + } + include: kTrain + } +# layer{ +# name: "data" +# type: kRecordInput +# store_conf { +# backend: "kvfile" +# path: "examples/cifar10/val_data.bin" +# mean_file: "examples/cifar10/image_mean.bin" +# batchsize: 64 +# random_skip: 5000 +# shape: 3 +# shape: 32 +# shape: 32 +# } +# include: kVal +# } + layer{ + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/cifar10/test_data.bin" + mean_file: "examples/cifar10/image_mean.bin" + batchsize: 100 + shape: 3 + shape: 32 + shape: 32 + } + include: kTest + } + + layer { + name: "conv1" + type: kCConvolution + srclayers: "data" + convolution_conf { + num_filters: 32 + kernel: 5 + stride: 1 + pad:2 + } + param { + name: "w1" + init { + type:kGaussian + std:0.0001 + } + } + param { + name: "b1" + lr_scale:2.0 + init { + type: kConstant + value:0 + } + } + } + + layer { + name: "pool1" + type: kCPooling + srclayers: "conv1" + pooling_conf { + pool: MAX + kernel: 3 + stride: 2 + } + } + layer { + name: "relu1" + type: kReLU + srclayers:"pool1" + } + layer { + name: "norm1" + type: kLRN + lrn_conf { + local_size: 3 + alpha: 5e-05 + beta: 0.75 + } + srclayers:"relu1" + } + layer { + name: "conv2" + type: kCConvolution + srclayers: "norm1" + convolution_conf { + num_filters: 32 + kernel: 5 + stride: 1 + pad:2 + } + param { + name: "w2" + init { + type:kGaussian + std:0.01 + } + } + param { + name: "b2" + lr_scale:2.0 + init { + type: kConstant + value:0 + } + } + } + layer { + name: "relu2" + type: kReLU + srclayers:"conv2" + } + layer { + name: "pool2" + type: kCPooling + srclayers: "relu2" + pooling_conf { + pool: AVG + kernel: 3 + stride: 2 + } + } + layer { + name: "norm2" + type: kLRN + lrn_conf { + local_size: 3 + alpha: 5e-05 + beta: 0.75 + } + srclayers:"pool2" + } + layer { + name: "conv3" + type: kCConvolution + srclayers: "norm2" + convolution_conf { + num_filters: 64 + kernel: 5 + stride: 1 + pad:2 + } + param { + name: "w3" + init { + type:kGaussian + std:0.01 + } + } + param { + name: "b3" + init { + type: kConstant + value:0 + } + } + } + layer { + name: "relu3" + type: kReLU + srclayers:"conv3" + } + layer { + name: "pool3" + type: kCPooling + srclayers: "relu3" + pooling_conf { + pool: AVG + kernel: 3 + stride: 2 + } + } + layer { + name: "ip1" + type: kInnerProduct + srclayers:"pool3" + innerproduct_conf { + num_output: 10 + } + param { + name: "w4" + wd_scale:250 + init { + type:kGaussian + std:0.01 + } + } + param { + name: "b4" + lr_scale:2.0 + wd_scale:0 + init { + type: kConstant + value:0 + } + } + } +# layer { +# name : "softmax" +# type: kSoftmax +# srclayers: "ip1" +# } +# +# layer { +# name : "argsort" +# type: kArgSort +# srclayers: "softmax" +# } + layer{ + name: "loss" + type: kSoftmaxLoss + softmaxloss_conf{ + topk:1 + } + srclayers:"ip1" + srclayers: "data" + } +# uncomment "softmax", "argsort", "output" layer and comment "loss" layer +# to extract features from argsort +# layer { +# name : "output" +# type: kCSVOutput +# srclayers: "argsort" +# store_conf { +# path: "examples/cifar10/out.csv" +# } +# } +} +cluster { + nworker_groups: 1 + nserver_groups: 1 + nworkers_per_group: 1 + nworkers_per_procs: 1 + workspace: "examples/cifar10" +} diff --git a/examples/mnist/gpu.conf b/examples/mnist/gpu.conf new file mode 100644 index 0000000000..41777a5aef --- /dev/null +++ b/examples/mnist/gpu.conf @@ -0,0 +1,243 @@ +name: "mlp" +train_steps: 1000 +test_steps:10 +test_freq:500 +disp_freq:100 +gpu:0 + +train_one_batch { + alg: kBP +} +updater{ + type: kSGD + learning_rate{ + type : kStep + base_lr: 0.001 + step_conf{ + change_freq: 60 + gamma: 0.997 + } + } +} + +neuralnet { + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/train_data.bin" + random_skip: 5000 + batchsize: 64 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + } + include: kTrain + } + + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/test_data.bin" + batchsize: 100 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + } + include: kTest + } + + layer{ + name: "fc1" + type: kInnerProduct + srclayers:"data" + innerproduct_conf{ + num_output: 2500 + } + param{ + name: "w1" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b1" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh1" + type: kSTanh + srclayers:"fc1" + } + layer{ + name: "fc2" + type: kInnerProduct + srclayers:"tanh1" + innerproduct_conf{ + num_output: 2000 + } + param{ + name: "w2" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b2" + init { + type: kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh2" + type: kSTanh + srclayers:"fc2" + } + layer{ + name: "fc3" + type: kInnerProduct + srclayers:"tanh2" + innerproduct_conf{ + num_output: 1500 + } + param{ + name: "w3" + init{ + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b3" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh3" + type: kSTanh + srclayers:"fc3" + } + layer{ + name: "fc4" + type: kInnerProduct + srclayers:"tanh3" + innerproduct_conf{ + num_output: 1000 + } + param{ + name: "w4" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b4" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh4" + type: kSTanh + srclayers:"fc4" + } + layer{ + name: "fc5" + type: kInnerProduct + srclayers:"tanh4" + innerproduct_conf{ + num_output: 500 + } + param{ + name: "w5" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b5" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh5" + type: kSTanh + srclayers:"fc5" + } + layer{ + name: "fc6" + type: kInnerProduct + srclayers:"tanh5" + innerproduct_conf{ + num_output: 10 + } + param{ + name: "w6" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b6" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + layer{ + name: "loss" + type:kSoftmaxLoss + softmaxloss_conf{ + topk:1 + } + srclayers:"fc6" + srclayers:"data" + } +} +cluster { + nworker_groups: 1 + nserver_groups: 1 + workspace: "examples/mnist" +} diff --git a/examples/mnist/hybrid.conf b/examples/mnist/hybrid.conf index a0706f7107..37bfbdab1b 100644 --- a/examples/mnist/hybrid.conf +++ b/examples/mnist/hybrid.conf @@ -1,8 +1,8 @@ name: "mlp" train_steps: 1000 test_steps:0 -test_freq:60 -disp_freq:10 +test_freq:500 +disp_freq:100 train_one_batch { alg: kBP diff --git a/examples/mnist/hybrid_gpu.conf b/examples/mnist/hybrid_gpu.conf new file mode 100644 index 0000000000..ead54e9849 --- /dev/null +++ b/examples/mnist/hybrid_gpu.conf @@ -0,0 +1,257 @@ +name: "mlp" +train_steps: 1000 +test_steps:0 +test_freq:500 +disp_freq:100 +gpu:0 + +train_one_batch { + alg: kBP +} + +updater{ + type: kSGD + learning_rate{ + type : kStep + base_lr: 0.001 + step_conf{ + change_freq: 60 + gamma: 0.997 + } + } +} + +neuralnet { + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/train_data.bin" + random_skip: 5000 + batchsize: 64 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + } + include: kTrain + } + + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/test_data.bin" + batchsize: 100 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + } + include: kTest + } + + layer{ + name: "fc1" + partition_dim: 0 + type: kInnerProduct + srclayers:"data" + innerproduct_conf{ + num_output: 2500 + } + param{ + name: "w1" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b1" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh1" + partition_dim: 0 + type: kSTanh + srclayers:"fc1" + } + layer{ + name: "fc2" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh1" + innerproduct_conf{ + num_output: 2000 + } + param{ + name: "w2" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b2" + init { + type: kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh2" + partition_dim: 0 + type: kSTanh + srclayers:"fc2" + } + layer{ + name: "fc3" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh2" + innerproduct_conf{ + num_output: 1500 + } + param{ + name: "w3" + init{ + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b3" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh3" + partition_dim: 0 + type: kSTanh + srclayers:"fc3" + } + layer{ + name: "fc4" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh3" + innerproduct_conf{ + num_output: 1000 + } + param{ + name: "w4" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b4" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh4" + partition_dim: 0 + type: kSTanh + srclayers:"fc4" + } + layer{ + name: "fc5" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh4" + innerproduct_conf{ + num_output: 500 + } + param{ + name: "w5" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b5" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh5" + partition_dim: 0 + type: kSTanh + srclayers:"fc5" + } + layer{ + name: "fc6" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh5" + innerproduct_conf{ + num_output: 10 + } + param{ + name: "w6" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b6" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + layer{ + name: "loss" + type:kSoftmaxLoss + softmaxloss_conf{ + topk:1 + } + srclayers:"fc6" + srclayers:"data" + } +} +cluster { + nworker_groups: 1 + nserver_groups: 1 + nworkers_per_group: 2 + nworkers_per_procs: 2 + workspace: "examples/mnist" +} diff --git a/examples/mnist/hybrid_gpux2.conf b/examples/mnist/hybrid_gpux2.conf new file mode 100644 index 0000000000..692e4222dd --- /dev/null +++ b/examples/mnist/hybrid_gpux2.conf @@ -0,0 +1,258 @@ +name: "mlp" +train_steps: 1000 +test_steps:0 +test_freq:500 +disp_freq:100 +gpu:0 +gpu:1 + +train_one_batch { + alg: kBP +} + +updater{ + type: kSGD + learning_rate{ + type : kStep + base_lr: 0.001 + step_conf{ + change_freq: 60 + gamma: 0.997 + } + } +} + +neuralnet { + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/train_data.bin" + random_skip: 5000 + batchsize: 64 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + } + include: kTrain + } + + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/test_data.bin" + batchsize: 100 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + } + include: kTest + } + + layer{ + name: "fc1" + partition_dim: 0 + type: kInnerProduct + srclayers:"data" + innerproduct_conf{ + num_output: 2500 + } + param{ + name: "w1" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b1" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh1" + partition_dim: 0 + type: kSTanh + srclayers:"fc1" + } + layer{ + name: "fc2" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh1" + innerproduct_conf{ + num_output: 2000 + } + param{ + name: "w2" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b2" + init { + type: kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh2" + partition_dim: 0 + type: kSTanh + srclayers:"fc2" + } + layer{ + name: "fc3" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh2" + innerproduct_conf{ + num_output: 1500 + } + param{ + name: "w3" + init{ + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b3" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh3" + partition_dim: 0 + type: kSTanh + srclayers:"fc3" + } + layer{ + name: "fc4" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh3" + innerproduct_conf{ + num_output: 1000 + } + param{ + name: "w4" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b4" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh4" + partition_dim: 0 + type: kSTanh + srclayers:"fc4" + } + layer{ + name: "fc5" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh4" + innerproduct_conf{ + num_output: 500 + } + param{ + name: "w5" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b5" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh5" + partition_dim: 0 + type: kSTanh + srclayers:"fc5" + } + layer{ + name: "fc6" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh5" + innerproduct_conf{ + num_output: 10 + } + param{ + name: "w6" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b6" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + layer{ + name: "loss" + type:kSoftmaxLoss + softmaxloss_conf{ + topk:1 + } + srclayers:"fc6" + srclayers:"data" + } +} +cluster { + nworker_groups: 1 + nserver_groups: 1 + nworkers_per_group: 2 + nworkers_per_procs: 2 + workspace: "examples/mnist" +} diff --git a/examples/mnist/job.conf b/examples/mnist/job.conf index 41d6b6fdb5..decd7bcf2a 100644 --- a/examples/mnist/job.conf +++ b/examples/mnist/job.conf @@ -1,8 +1,10 @@ name: "mlp" train_steps: 1000 test_steps:10 -test_freq:60 -disp_freq:10 +test_freq:500 +disp_freq:100 + + train_one_batch { alg: kBP } diff --git a/src/driver.cc b/src/driver.cc index 1e4929f8a0..d03658e3e0 100644 --- a/src/driver.cc +++ b/src/driver.cc @@ -244,6 +244,7 @@ void Driver::Train(const JobProto& job_conf) { threads.push_back(std::thread(&Worker::Run, worker)); int device_id = -1; if (gpu < job_conf.gpu_size()) { + LOG(ERROR) << "Creating GPU..."; device_id = job_conf.gpu(gpu++); } context->SetupDevice(threads.back().get_id(), device_id); diff --git a/src/worker.cc b/src/worker.cc index 2afa8b06fe..a4184aae07 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -62,6 +62,7 @@ Worker::~Worker() { } void Worker::Run() { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // setup gpu device auto context = Singleton::Instance(); int device = context->device_id(std::this_thread::get_id()); From 32a663ed9906863a68755581db6ce6fa9f77313d Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Sat, 12 Mar 2016 00:23:05 +0800 Subject: [PATCH 04/18] Modified ClusterProto to support GPU information. --- src/proto/job.proto | 1 + 1 file changed, 1 insertion(+) diff --git a/src/proto/job.proto b/src/proto/job.proto index 7bc0ea3b9b..3103d08020 100644 --- a/src/proto/job.proto +++ b/src/proto/job.proto @@ -166,6 +166,7 @@ message ClusterProto { optional int32 nservers_per_group = 4 [default = 1]; optional int32 nworkers_per_procs = 5 [default = 1]; optional int32 nservers_per_procs = 6 [default = 1]; + optional int32 ngpu_per_group = 7 [default=0]; // local workspace for checkpoint files and vis files //required string workspace = 10; optional string workspace = 10; From dddac210dd7a617f96f2ee02fc4490e435754644 Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Sat, 12 Mar 2016 00:26:59 +0800 Subject: [PATCH 05/18] Modified common.proto to add enume for DeviceType. --- src/proto/common.proto | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/proto/common.proto b/src/proto/common.proto index b1ba1b686f..514ed1630e 100644 --- a/src/proto/common.proto +++ b/src/proto/common.proto @@ -50,6 +50,12 @@ enum ConnectionType { kOneToMany = 2; } +enum DeviceType { + kCPU = 0; + kGPU = 1; + kOther = 2; +} + // to import caffe's lmdb dataset message CaffeDatum { optional int32 channels = 1; From 8cc7fa51b4e9c2fbd6dcb3db3778105a09a1da3b Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Sat, 12 Mar 2016 00:30:47 +0800 Subject: [PATCH 06/18] Slightly modified worker.h for now. --- include/singa/worker.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/include/singa/worker.h b/include/singa/worker.h index 34c8000371..0b8a8490d1 100644 --- a/include/singa/worker.h +++ b/include/singa/worker.h @@ -263,7 +263,7 @@ class Worker { */ inline int grp_id() const { return grp_id_; } /** - * @reutrn worker ID within the worker group. + * @return worker ID within the worker group. */ inline int id() const { return id_; } @@ -271,6 +271,7 @@ class Worker { int grp_id_ = -1, id_ = -1; int step_ = 0; JobProto job_conf_; + DeviceType device_type; NeuralNet* train_net_ = nullptr; NeuralNet* test_net_ = nullptr; NeuralNet* val_net_ = nullptr; From 681ebba157867392dfc20b5b321d740f9cd26bf7 Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Sat, 12 Mar 2016 14:29:08 +0800 Subject: [PATCH 07/18] Added DeviceType across Driver, Worker and JobProto. --- include/singa/utils/cluster.h | 1 + include/singa/utils/context.h | 4 +-- include/singa/worker.h | 18 +++++++++++++- src/driver.cc | 40 ++++++++++++++++++++---------- src/neuralnet/input_layer/store.cc | 1 + src/proto/job.proto | 4 ++- src/worker.cc | 9 ++++++- 7 files changed, 59 insertions(+), 18 deletions(-) diff --git a/include/singa/utils/cluster.h b/include/singa/utils/cluster.h index c1dc93bdc8..2114e9293c 100644 --- a/include/singa/utils/cluster.h +++ b/include/singa/utils/cluster.h @@ -53,6 +53,7 @@ class Cluster { inline int nservers_per_group() const { return cluster_.nservers_per_group();} inline int nworkers_per_procs() const { return cluster_.nworkers_per_procs();} inline int nservers_per_procs() const { return cluster_.nservers_per_procs();} + inline int ngpu_per_group() const { return cluster_.ngpu_per_group();} inline int nworker_groups_per_server_group() const { if (nserver_groups() == 0 || nservers_per_group() == 0) return 1; diff --git a/include/singa/utils/context.h b/include/singa/utils/context.h index b1128c1fb1..1942c39341 100644 --- a/include/singa/utils/context.h +++ b/include/singa/utils/context.h @@ -145,7 +145,7 @@ class Context { } /** - * \copybreif rand_generator(const std::thread::id&); + * \copybrief rand_generator(const std::thread::id&); * @return the CPU random generator for the calling thread. */ std::mt19937* rand_generator() { @@ -170,7 +170,7 @@ class Context { } #ifdef USE_GPU /** - * \copybreif cublas_handle_(const std::thread::id&); + * \copybrief cublas_handle_(const std::thread::id&); * @return cublas handle for the calling thread. */ cublasHandle_t cublas_handle() { diff --git a/include/singa/worker.h b/include/singa/worker.h index 0b8a8490d1..086a5d4a2f 100644 --- a/include/singa/worker.h +++ b/include/singa/worker.h @@ -62,6 +62,12 @@ class Worker { * @return a pointer to the instance of the Worker subclass. */ static Worker* Create(const AlgProto& conf); + /** + * Create an instance of the subclass of Worker + * with the specified device type. + */ + static Worker* Create(const AlgProto& conf, DeviceType devtype); + virtual ~Worker(); /** * @param[in] grp_id global worker group ID @@ -267,11 +273,21 @@ class Worker { */ inline int id() const { return id_; } + /** + * @return The type of device this thread currently is assigned to. + */ + inline DeviceType device_type() { return device_type_; } + + inline DeviceType device_type(DeviceType devtype) { + device_type_ = devtype; + return device_type_; + } + protected: int grp_id_ = -1, id_ = -1; int step_ = 0; JobProto job_conf_; - DeviceType device_type; + DeviceType device_type_; NeuralNet* train_net_ = nullptr; NeuralNet* test_net_ = nullptr; NeuralNet* val_net_ = nullptr; diff --git a/src/driver.cc b/src/driver.cc index a37b12480f..ab5e8a192e 100644 --- a/src/driver.cc +++ b/src/driver.cc @@ -213,6 +213,9 @@ void Driver::Test(const JobProto& job_conf) { worker->Test(job_conf.test_steps(), kTest, net); } +////////// +// Main training function +////////// void Driver::Train(const JobProto& job_conf) { auto cluster = Cluster::Get(); int nserver_grps = cluster->nserver_groups(); @@ -246,12 +249,11 @@ void Driver::Train(const JobProto& job_conf) { // CHECK_LE(workers.size(), job_conf.gpu_size()); for (auto worker : workers) { threads.push_back(std::thread(&Worker::Run, worker)); - int device_id = -1; - if (gpu < job_conf.gpu_size()) { - LOG(ERROR) << "Creating GPU..."; - device_id = job_conf.gpu(gpu++); + if (worker->device_type() == DeviceType::kGPU) { + context->SetupDevice(threads.back().get_id(), gpu++); + } else { + context->SetupDevice(threads.back().get_id(), -1); } - context->SetupDevice(threads.back().get_id(), device_id); } if (grp_size > 1 || nserver_grps > 0) { int nservers_per_grp = cluster->nservers_per_group(); @@ -320,19 +322,26 @@ const vector Driver::CreateWorkers(const JobProto& job_conf, NeuralNet* net) { auto cluster = Cluster::Get(); vector workers; - if (!cluster->has_worker()) return workers; - int wgrp_size = cluster->nworkers_per_group(); - int nservers_per_grp = cluster->nservers_per_group(); - int nserver_grps = cluster->nserver_groups(); + + if (!cluster->has_worker()) + return workers; + + int nservers_per_grp = cluster->nservers_per_group(), + nserver_grps = cluster->nserver_groups(), + ngpu_per_group = cluster->ngpu_per_group(); int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp); + const vector rng = cluster->ExecutorRng(cluster->procs_id(), cluster->nworkers_per_group(), cluster->nworkers_per_procs()); int gstart = rng[0], gend = rng[1], wstart = rng[2], wend = rng[3]; + LOG(ERROR) << "gstart: " << gstart << " gend: " << gend + << " wstart: " << wstart << " wend: " << wend; + for (int gid = gstart; gid < gend; gid++) { - NeuralNet* train_net = nullptr, *test_net = nullptr, *val_net = nullptr; + NeuralNet* train_net = net, *test_net = nullptr, *val_net = nullptr; if (gid == gstart) { - train_net = net; Param::SliceParams(lcm, train_net->params()); + // test and validation are performed by the 1st group. if (gid == 0 && job_conf.test_steps() > 0) { test_net = NeuralNet::Create(job_conf.neuralnet(), kTest, 1); @@ -342,16 +351,20 @@ const vector Driver::CreateWorkers(const JobProto& job_conf, val_net = NeuralNet::Create(job_conf.neuralnet(), kVal, 1); val_net->ShareParamsFrom(train_net, false); } + } else { - train_net = NeuralNet::Create(job_conf.neuralnet(), kTrain, wgrp_size); if (cluster->share_memory()) { train_net->ShareParamsFrom(net, true); } else { Param::SliceParams(lcm, train_net->params()); } } + for (int wid = wstart; wid < wend; wid++) { - auto *worker = Worker::Create(job_conf.train_one_batch()); + // Set DeviceType to CPU or GPU here. + auto *worker = Worker::Create(job_conf.train_one_batch(), + (ngpu_per_group-- < 1)? DeviceType::kCPU : DeviceType::kGPU); + // TODO(wangwei) extend to test among workers in a grp if (wid == 0) worker->Setup(gid, wid, job_conf, train_net, val_net, test_net); @@ -384,6 +397,7 @@ const vector Driver::CreateServers(const JobProto& job_conf, const vector rng = cluster->ExecutorRng(server_procs, cluster->nservers_per_group(), cluster->nservers_per_procs()); int gstart = rng[0], gend = rng[1], start = rng[2], end = rng[3]; + for (int gid = gstart; gid < gend; gid++) { for (int sid = start; sid < end; sid++) { auto server = new Server(gid, sid, job_conf, slice2group, slice2server); diff --git a/src/neuralnet/input_layer/store.cc b/src/neuralnet/input_layer/store.cc index 283b0c73a7..c6530b5874 100644 --- a/src/neuralnet/input_layer/store.cc +++ b/src/neuralnet/input_layer/store.cc @@ -38,6 +38,7 @@ void StoreInputLayer::Setup(const LayerProto& conf, if (conf.partition_dim() == 0) { batchsize_ /= conf.num_partitions(); } + LOG(ERROR) << "Ratio: " << layer_conf_.store_conf().ratio(); } void StoreInputLayer::ComputeFeature(int flag, diff --git a/src/proto/job.proto b/src/proto/job.proto index 3103d08020..b40f793ba1 100644 --- a/src/proto/job.proto +++ b/src/proto/job.proto @@ -166,7 +166,8 @@ message ClusterProto { optional int32 nservers_per_group = 4 [default = 1]; optional int32 nworkers_per_procs = 5 [default = 1]; optional int32 nservers_per_procs = 6 [default = 1]; - optional int32 ngpu_per_group = 7 [default=0]; + optional int32 ngpu_per_group = 7 [default = 0]; + // local workspace for checkpoint files and vis files //required string workspace = 10; optional string workspace = 10; @@ -380,6 +381,7 @@ message StoreProto { optional bool encoded = 10 [default = false]; optional int32 random_skip = 11 [default = 0]; optional bool has_label = 12 [default = true]; + optional int32 ratio = 13 [default = 1]; } message CharRNNProto { optional string path = 1; diff --git a/src/worker.cc b/src/worker.cc index a4184aae07..b4ddf6acb6 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -36,12 +36,17 @@ namespace singa { using std::string; Worker* Worker::Create(const AlgProto& conf) { + return Create(conf, DeviceType::kCPU); +} + +Worker* Worker::Create(const AlgProto& conf, DeviceType devtype) { auto factory = Singleton>::Instance(); Worker* worker = nullptr; if (conf.has_user_alg()) worker = factory->Create(conf.user_alg()); else worker = factory->Create(conf.alg()); + worker->device_type(devtype); return worker; } @@ -68,8 +73,10 @@ void Worker::Run() { int device = context->device_id(std::this_thread::get_id()); LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") " << " start on " << (device >= 0 ? "GPU " + std::to_string(device) : "CPU"); - if (device >= 0) + LOG(ERROR) << "Device ID: " << device << " DeviceType: " << this->device_type(); + if (this->device_type_ == DeviceType::kGPU) { context->ActivateDevice(device); + } auto cluster = Cluster::Get(); int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group(); From 64b43e276517a1343cabdbe9e3b737f7c0ad95b6 Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Sat, 12 Mar 2016 20:15:53 +0800 Subject: [PATCH 08/18] Fix the worker sleep hack into a more proper solution. --- include/singa/worker.h | 17 +++++----- src/driver.cc | 77 +++++++++++++++++++++--------------------- src/worker.cc | 14 ++++---- 3 files changed, 53 insertions(+), 55 deletions(-) diff --git a/include/singa/worker.h b/include/singa/worker.h index 086a5d4a2f..da7e9c0030 100644 --- a/include/singa/worker.h +++ b/include/singa/worker.h @@ -66,7 +66,7 @@ class Worker { * Create an instance of the subclass of Worker * with the specified device type. */ - static Worker* Create(const AlgProto& conf, DeviceType devtype); + static Worker* Create(const AlgProto& conf, int devid); virtual ~Worker(); /** @@ -274,20 +274,21 @@ class Worker { inline int id() const { return id_; } /** - * @return The type of device this thread currently is assigned to. + * @return The device ID of this worker. It is -1 for CPU and >-1 for GPU. */ - inline DeviceType device_type() { return device_type_; } + inline int device_id() { return device_id_; } - inline DeviceType device_type(DeviceType devtype) { - device_type_ = devtype; - return device_type_; - } + /** + * @param devid The ID value to assign to this worker. + */ + inline void device_id(int devid) { device_id_ = devid; } protected: int grp_id_ = -1, id_ = -1; int step_ = 0; + bool execute_; JobProto job_conf_; - DeviceType device_type_; + int device_id_; NeuralNet* train_net_ = nullptr; NeuralNet* test_net_ = nullptr; NeuralNet* val_net_ = nullptr; diff --git a/src/driver.cc b/src/driver.cc index ab5e8a192e..25254bf9ee 100644 --- a/src/driver.cc +++ b/src/driver.cc @@ -164,6 +164,9 @@ void Driver::InitLog(char* arg) { google::InitGoogleLogging(arg); } +////////// +// TRAIN +////////// void Driver::Train(bool resume, const std::string str) { JobProto job_conf; job_conf.ParseFromString(str); @@ -177,9 +180,9 @@ void Driver::Train(bool resume, const JobProto& job_conf) { Cluster::Setup(job_id_, singa_conf_, job_conf.cluster()); tinydir_dir workspace; if (tinydir_open(&workspace, job_conf.cluster().workspace().c_str()) == -1) - LOG(FATAL) << "workspace not exist: " << job_conf.cluster().workspace(); + LOG(FATAL) << "Workspace does not exist: " << job_conf.cluster().workspace(); if (job_conf.num_openblas_threads() != 1) - LOG(WARNING) << "openblas luanches " + LOG(WARNING) << "Openblas launched " << job_conf.num_openblas_threads() << " threads"; openblas_set_num_threads(job_conf.num_openblas_threads()); @@ -191,31 +194,6 @@ void Driver::Train(bool resume, const JobProto& job_conf) { Train(job); } -void Driver::Test(const std::string str) { - JobProto job_conf; - job_conf.ParseFromString(str); - Test(job_conf); -} - -void Driver::Test(const JobProto& job_conf) { - Cluster::Setup(job_id_, singa_conf_, job_conf.cluster()); - Cluster::Get()->Register(getpid(), "localhost"); - // TODO(wangwei) extend to a group with multiple workers - auto worker = Worker::Create(job_conf.train_one_batch()); - worker->Setup(0, 0, job_conf, nullptr, nullptr, nullptr); - auto net = NeuralNet::Create(job_conf.neuralnet(), kTest, 1); - WriteStringToTextFile(Cluster::Get()->vis_folder() + "/test_net.json", - net->ToGraph(true).ToJson()); - vector paths; - for (const auto& p : job_conf.checkpoint_path()) - paths.push_back(p); - net->Load(paths); - worker->Test(job_conf.test_steps(), kTest, net); -} - -////////// -// Main training function -////////// void Driver::Train(const JobProto& job_conf) { auto cluster = Cluster::Get(); int nserver_grps = cluster->nserver_groups(); @@ -244,22 +222,15 @@ void Driver::Train(const JobProto& job_conf) { vector threads; for (auto server : servers) threads.push_back(std::thread(&Server::Run, server)); - int gpu = 0; - auto context = Singleton::Instance(); - // CHECK_LE(workers.size(), job_conf.gpu_size()); - for (auto worker : workers) { + for (auto worker : workers) threads.push_back(std::thread(&Worker::Run, worker)); - if (worker->device_type() == DeviceType::kGPU) { - context->SetupDevice(threads.back().get_id(), gpu++); - } else { - context->SetupDevice(threads.back().get_id(), -1); - } - } + if (grp_size > 1 || nserver_grps > 0) { int nservers_per_grp = cluster->nservers_per_group(); int lcm = LeastCommonMultiple(nservers_per_grp, nserver_grps); auto slices = Param::ComputeSlices(lcm, net->params()); auto slice2server = PartitionSlices(nservers_per_grp, slices); + stub.Run(slice2server, workers, servers); } @@ -279,6 +250,34 @@ void Driver::Train(const JobProto& job_conf) { } } +////////// +// TEST +////////// +void Driver::Test(const std::string str) { + JobProto job_conf; + job_conf.ParseFromString(str); + Test(job_conf); +} + +void Driver::Test(const JobProto& job_conf) { + Cluster::Setup(job_id_, singa_conf_, job_conf.cluster()); + Cluster::Get()->Register(getpid(), "localhost"); + + // TODO(wangwei) extend to a group with multiple workers + auto worker = Worker::Create(job_conf.train_one_batch()); + worker->Setup(0, 0, job_conf, nullptr, nullptr, nullptr); + + auto net = NeuralNet::Create(job_conf.neuralnet(), kTest, 1); + WriteStringToTextFile(Cluster::Get()->vis_folder() + "/test_net.json", + net->ToGraph(true).ToJson()); + + vector paths; + for (const auto& p : job_conf.checkpoint_path()) + paths.push_back(p); + net->Load(paths); + worker->Test(job_conf.test_steps(), kTest, net); +} + void Driver::SetupForResume(JobProto* job_conf) { tinydir_dir dir; std::string folder = Cluster::Get()->checkpoint_folder(); @@ -363,8 +362,8 @@ const vector Driver::CreateWorkers(const JobProto& job_conf, for (int wid = wstart; wid < wend; wid++) { // Set DeviceType to CPU or GPU here. auto *worker = Worker::Create(job_conf.train_one_batch(), - (ngpu_per_group-- < 1)? DeviceType::kCPU : DeviceType::kGPU); - + (ngpu_per_group > 0) ? job_conf.gpu(ngpu_per_group-1) : -1); + ngpu_per_group--; // TODO(wangwei) extend to test among workers in a grp if (wid == 0) worker->Setup(gid, wid, job_conf, train_net, val_net, test_net); diff --git a/src/worker.cc b/src/worker.cc index b4ddf6acb6..5a52d0af94 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -36,17 +36,17 @@ namespace singa { using std::string; Worker* Worker::Create(const AlgProto& conf) { - return Create(conf, DeviceType::kCPU); + return Create(conf, -1); } -Worker* Worker::Create(const AlgProto& conf, DeviceType devtype) { +Worker* Worker::Create(const AlgProto& conf, int devid) { auto factory = Singleton>::Instance(); Worker* worker = nullptr; if (conf.has_user_alg()) worker = factory->Create(conf.user_alg()); else worker = factory->Create(conf.alg()); - worker->device_type(devtype); + worker->device_id(devid); return worker; } @@ -67,16 +67,14 @@ Worker::~Worker() { } void Worker::Run() { - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - // setup gpu device auto context = Singleton::Instance(); + context->SetupDevice(std::this_thread::get_id(), device_id_); + int device = context->device_id(std::this_thread::get_id()); LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") " << " start on " << (device >= 0 ? "GPU " + std::to_string(device) : "CPU"); - LOG(ERROR) << "Device ID: " << device << " DeviceType: " << this->device_type(); - if (this->device_type_ == DeviceType::kGPU) { + if (this->device_id_ > -1) context->ActivateDevice(device); - } auto cluster = Cluster::Get(); int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group(); From 884fc356258ddd714a3b33c3a709b859f7b540ae Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Sun, 13 Mar 2016 00:20:14 +0800 Subject: [PATCH 09/18] Added 2x2 conf file for mnist. --- examples/mnist/2x2.conf | 261 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 261 insertions(+) create mode 100644 examples/mnist/2x2.conf diff --git a/examples/mnist/2x2.conf b/examples/mnist/2x2.conf new file mode 100644 index 0000000000..d0da46ad3c --- /dev/null +++ b/examples/mnist/2x2.conf @@ -0,0 +1,261 @@ +name: "mlp" +train_steps: 1000 +test_steps:0 +test_freq:500 +disp_freq:100 +gpu:0 +gpu:1 + +train_one_batch { + alg: kBP +} + +updater{ + type: kSGD + learning_rate{ + type : kStep + base_lr: 0.001 + step_conf{ + change_freq: 60 + gamma: 0.997 + } + } +} + +neuralnet { + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/train_data.bin" + random_skip: 5000 + batchsize: 64 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + ratio: 5 + } + include: kTrain + } + + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/test_data.bin" + batchsize: 100 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + } + include: kTest + } + + layer{ + name: "fc1" + partition_dim: 0 + type: kInnerProduct + srclayers:"data" + innerproduct_conf{ + num_output: 2500 + } + param{ + name: "w1" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b1" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh1" + partition_dim: 0 + type: kSTanh + srclayers:"fc1" + } + layer{ + name: "fc2" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh1" + innerproduct_conf{ + num_output: 2000 + } + param{ + name: "w2" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b2" + init { + type: kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh2" + partition_dim: 0 + type: kSTanh + srclayers:"fc2" + } + layer{ + name: "fc3" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh2" + innerproduct_conf{ + num_output: 1500 + } + param{ + name: "w3" + init{ + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b3" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh3" + partition_dim: 0 + type: kSTanh + srclayers:"fc3" + } + layer{ + name: "fc4" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh3" + innerproduct_conf{ + num_output: 1000 + } + param{ + name: "w4" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b4" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh4" + partition_dim: 0 + type: kSTanh + srclayers:"fc4" + } + layer{ + name: "fc5" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh4" + innerproduct_conf{ + num_output: 500 + } + param{ + name: "w5" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b5" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh5" + partition_dim: 0 + type: kSTanh + srclayers:"fc5" + } + layer{ + name: "fc6" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh5" + innerproduct_conf{ + num_output: 10 + } + param{ + name: "w6" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b6" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + layer{ + name: "loss" + type:kSoftmaxLoss + softmaxloss_conf{ + topk:1 + } + srclayers:"fc6" + srclayers:"data" + } +} +cluster { + nworker_groups: 1 + nserver_groups: 1 + nworkers_per_group: 4 + nworkers_per_procs: 4 + ngpu_per_group:2 + workspace: "examples/mnist" +} + From 92bda991eec2adf1cf9d6a8511c0bad5db6b5389 Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Sun, 13 Mar 2016 00:28:27 +0800 Subject: [PATCH 10/18] C++ Linting. --- include/singa/driver.h | 8 ++++---- src/worker.cc | 2 -- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/include/singa/driver.h b/include/singa/driver.h index fb5a33a8c7..797beb23d1 100644 --- a/include/singa/driver.h +++ b/include/singa/driver.h @@ -18,8 +18,8 @@ * under the License. * *************************************************************/ -#ifndef SINGA_SINGA_DRIVER_H_ -#define SINGA_SINGA_DRIVER_H_ +#ifndef SINGA_DRIVER_H_ +#define SINGA_DRIVER_H_ #include #include "singa/proto/job.pb.h" @@ -74,7 +74,7 @@ class Driver { * files. * @param[in] str serialized string recorded job configuration. */ - void Train(bool resume, const std::string str); + void Train(bool resume, const std::string str); /** * Create workers and servers to conduct the training. * @@ -259,4 +259,4 @@ int Driver::RegisterWorker(const Type& type) { } // namespace singa -#endif // SINGA_SINGA_DRIVER_H_ +#endif // SINGA_DRIVER_H_ diff --git a/src/worker.cc b/src/worker.cc index 5a52d0af94..0f066b3cad 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -22,8 +22,6 @@ #include "singa/worker.h" #include -#include -#include #include #include "singa/utils/cluster.h" #include "singa/utils/factory.h" From 9837faa6d9de34a52ca81b6e8ca44f4729576b9d Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Sun, 13 Mar 2016 23:22:48 +0800 Subject: [PATCH 11/18] Implemented thread pool. --- include/singa/utils/context.h | 9 +- include/singa/utils/threadpool.h | 183 +++++++++++++++++++++++++++++ src/driver.cc | 17 ++- src/neuralnet/input_layer/store.cc | 2 +- src/worker.cc | 7 +- 5 files changed, 206 insertions(+), 12 deletions(-) create mode 100644 include/singa/utils/threadpool.h diff --git a/include/singa/utils/context.h b/include/singa/utils/context.h index 1942c39341..01cea6e1e6 100644 --- a/include/singa/utils/context.h +++ b/include/singa/utils/context.h @@ -23,6 +23,7 @@ #define SINGA_UTILS_CONTEXT_H_ #include +#include #include #include #include @@ -132,10 +133,16 @@ class Context { void SetupDevice(const std::thread::id& tid, const int did, const int seed) { device_id_[tid] = did; seed_[tid] = seed; +#ifdef USE_GPU + if (did > -1) { + cudaSetDevice(did); + } +#endif } /** * Activate the GPU device by calling cudaSetDevice. + * TODO: Deprecate? No longer needed. */ void ActivateDevice(const int device_id) { CHECK_GE(device_id, 0); @@ -254,7 +261,7 @@ class Context { //!< max num of GPUs per process const int kMaxNumGPU = 64; //!< map from thread id to device id - std::unordered_map device_id_; + std::unordered_map device_id_; //!< map from thread id to cpu rand generator std::unordered_map rand_generator_; //!< map from thread id to cpu rand generator seed diff --git a/include/singa/utils/threadpool.h b/include/singa/utils/threadpool.h new file mode 100644 index 0000000000..9f1cf43259 --- /dev/null +++ b/include/singa/utils/threadpool.h @@ -0,0 +1,183 @@ +#ifndef SINGA_THREADPOOL_H_ +#define SINGA_THREADPOOL_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace singa { + +/** + * Stores an arbitrary number of threads and facilitates the + * queueing and distribution of arbitrary tasks to various + * threads pooled in the thread pool. In the event that a task + * throws an exception, if the exception type is derived from + * std::exception then it will be logged, otherwise ignored, and + * the worker thread will return to the thread pool. Undefined + * behaviour occurs whenever an operation on the mutex object + * throws an exception. + */ +class Threadpool final { +public: + /** + * Create a thread pool with the specified size and initialize + * each worker thread. Provides basic exception safety. + */ + explicit Threadpool( const std::size_t size ); + + /** + * Destroy a thread pool. Must be called from a thread not managed + * by this thread pool. + */ + ~Threadpool(); + + // Enqueue the specified task. Provides basic exception safety. + template + auto enqueue( Fn &&fn, Args && ...args ) + ->std::future::type>; + + /** + * Retrieve the number of threads currently managed by this + * thread pool. + */ + int size() { return this->threads_.size(); } + +private: + // Type of a task. + using task_t = std::function; + + // A list of worker threads. + std::vector threads_; + // A queue of tasks to be distributed and invoked. + std::queue task_queue_; + // A mutex for atomically getting and setting this threadpool's fields. + std::mutex queue_mutex_; + // Condition used to lock each worker thread until its been notified + // that a potential task is ready to be assigned to it and invoked. + std::condition_variable cond_handle_task_; + // Determine if this thread pool is being destroyed or there is at least + // one queued tasks to be distributed. + std::function ready_task_; + + // State of the thread pool. + bool stop; + + Threadpool( const Threadpool & ) = delete; // Copy constructor disabled. + Threadpool( Threadpool && ) = delete; // Move constructor disabled. + Threadpool &operator=( const Threadpool & ) = delete; // Copy assignment operator disabled. + Threadpool &operator=( Threadpool && ) = delete; // Move assignment operator disabled. +}; + +// Constructor +inline Threadpool::Threadpool( const size_t size ) : stop( false ) { + // Set the target function for the ready task predicate. + this->ready_task_ = [this] { + return this->stop || !this->task_queue_.empty(); + }; + + // Create the specified number of new worker threads. + for ( size_t k = 0; k < size; ++k ) { + this->threads_.emplace_back( [this] { + // This worker thread's assigned task. + task_t task; + // Synchronization state of this worker thread. + bool synchronize = false; + + // Iterate until this worker thread is to be synchronized. + do { + { + // Block this worker thread until it can accept a task from the + // thread pool. + std::unique_lock lock( this->queue_mutex_ ); + + this->cond_handle_task_.wait( lock, this->ready_task_ ); + + // Determine if we are destroying this thread pool and the tasks + // queue is empty. If so, synchronize this worker thread, otherwise + // dequeue a task and assign it to this worker thread. + if ( this->stop && this->task_queue_.empty() ) { + synchronize = true; + } else { + task = std::move( this->task_queue_.front() ); + this->task_queue_.pop(); + } + } + + // Determine if we are not synchronizing this worker thread. If so, + // invoke this worker thread's assigned task. + if ( !synchronize ) { + try { + task(); + } catch ( std::exception & ) { + // Log the exception caught by the worker thread's assigned task. + } catch ( ... ) {} + } + } while ( !synchronize ); + }); + } +} + +// Destructor +inline Threadpool::~Threadpool() { + // Change the state of the thread pool to "being destroyed". + { + std::lock_guard lock( this->queue_mutex_ ); + this->stop = true; + } + + // Notify all of the blocked worker threads of this state change. + this->cond_handle_task_.notify_all(); + + // Block the current thread until all of the worker threads from this thread + // pool are synchronized. + for ( auto &thread : this->threads_ ) { + thread.join(); + } +} + +template +inline auto Threadpool::enqueue( Fn &&fn, Args && ...args ) + -> std::future::type> { + + using return_type = typename std::result_of::type; + + // Package the specified task in to a new copyable managed object and + // preserve each argument's value category. + auto task = std::make_shared>( + std::bind( std::forward( fn ), std::forward( args )... ) + ); + + // Get the promised future return value of the specified task. + std::future retval = task->get_future(); + + // Enqueue the specified task. + { + std::unique_lock lock( this->queue_mutex_ ); + + // Determine if we are preparing to destroy this thread pool. If so, throw + // a runtime error exception. + if ( this->stop ) { + throw std::runtime_error( "Cannot enqueue a task while this thread pool " + "is preparing to be destroyed." ); + } + + //this->task_queue_.emplace( [task = std::move( task )]{ (*task)(); } ); + this->task_queue_.emplace( [task](){ (*task)(); } ); + } + + // Notify a worker thread that there is a new task to be invoked. + this->cond_handle_task_.notify_one(); + + return retval; +} + +} // namespace singa + +#endif // SINGA_THREADPOOL_H_ diff --git a/src/driver.cc b/src/driver.cc index 25254bf9ee..86bc8c338b 100644 --- a/src/driver.cc +++ b/src/driver.cc @@ -30,6 +30,7 @@ #include "singa/utils/tinydir.h" #include "singa/utils/cluster.h" #include "singa/utils/context.h" +#include "singa/utils/threadpool.h" #include "singa/proto/job.pb.h" #include "singa/server.h" #include "singa/stub.h" @@ -196,6 +197,10 @@ void Driver::Train(bool resume, const JobProto& job_conf) { void Driver::Train(const JobProto& job_conf) { auto cluster = Cluster::Get(); + + Threadpool threadpool(cluster->nserver_groups() * cluster->nservers_per_group() + + cluster->nworker_groups() * cluster->nworkers_per_group()); + int nserver_grps = cluster->nserver_groups(); int grp_size = cluster->nworkers_per_group(); Stub stub; @@ -219,11 +224,13 @@ void Driver::Train(const JobProto& job_conf) { MPIQueues.push_back(make_shared()); #endif - vector threads; +// vector threads; for (auto server : servers) - threads.push_back(std::thread(&Server::Run, server)); + threadpool.enqueue(&Server::Run, server); +// threads.push_back(std::thread(&Server::Run, server)); for (auto worker : workers) - threads.push_back(std::thread(&Worker::Run, worker)); + threadpool.enqueue(&Worker::Run, worker); +// threads.push_back(std::thread(&Worker::Run, worker)); if (grp_size > 1 || nserver_grps > 0) { int nservers_per_grp = cluster->nservers_per_group(); @@ -234,8 +241,8 @@ void Driver::Train(const JobProto& job_conf) { stub.Run(slice2server, workers, servers); } - for (auto& thread : threads) - thread.join(); +// for (auto& thread : threads) +// thread.join(); for (auto server : servers) delete server; delete net; diff --git a/src/neuralnet/input_layer/store.cc b/src/neuralnet/input_layer/store.cc index c6530b5874..dc3d379d8e 100644 --- a/src/neuralnet/input_layer/store.cc +++ b/src/neuralnet/input_layer/store.cc @@ -38,7 +38,7 @@ void StoreInputLayer::Setup(const LayerProto& conf, if (conf.partition_dim() == 0) { batchsize_ /= conf.num_partitions(); } - LOG(ERROR) << "Ratio: " << layer_conf_.store_conf().ratio(); + //LOG(ERROR) << "Ratio: " << layer_conf_.store_conf().ratio(); } void StoreInputLayer::ComputeFeature(int flag, diff --git a/src/worker.cc b/src/worker.cc index 0f066b3cad..bb9d1ef1c6 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -66,13 +66,10 @@ Worker::~Worker() { void Worker::Run() { auto context = Singleton::Instance(); - context->SetupDevice(std::this_thread::get_id(), device_id_); + context->SetupDevice(std::this_thread::get_id(), this->device_id_); - int device = context->device_id(std::this_thread::get_id()); LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") " - << " start on " << (device >= 0 ? "GPU " + std::to_string(device) : "CPU"); - if (this->device_id_ > -1) - context->ActivateDevice(device); + << " start on " << (device_id_ >= 0 ? "GPU " + std::to_string(device_id_) : "CPU"); auto cluster = Cluster::Get(); int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group(); From c84c087d4e60721b65f62b37b82f3b2050818d62 Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Sun, 13 Mar 2016 23:25:53 +0800 Subject: [PATCH 12/18] ActivateDevice in Context has been removed because now we can always be sure that the did passed into SetupDevice is always correct. --- include/singa/utils/context.h | 11 ----------- src/driver.cc | 5 ----- 2 files changed, 16 deletions(-) diff --git a/include/singa/utils/context.h b/include/singa/utils/context.h index 01cea6e1e6..2218a413bd 100644 --- a/include/singa/utils/context.h +++ b/include/singa/utils/context.h @@ -140,17 +140,6 @@ class Context { #endif } - /** - * Activate the GPU device by calling cudaSetDevice. - * TODO: Deprecate? No longer needed. - */ - void ActivateDevice(const int device_id) { - CHECK_GE(device_id, 0); -#ifdef USE_GPU - cudaSetDevice(device_id); -#endif - } - /** * \copybrief rand_generator(const std::thread::id&); * @return the CPU random generator for the calling thread. diff --git a/src/driver.cc b/src/driver.cc index 86bc8c338b..3b463b255a 100644 --- a/src/driver.cc +++ b/src/driver.cc @@ -224,13 +224,10 @@ void Driver::Train(const JobProto& job_conf) { MPIQueues.push_back(make_shared()); #endif -// vector threads; for (auto server : servers) threadpool.enqueue(&Server::Run, server); -// threads.push_back(std::thread(&Server::Run, server)); for (auto worker : workers) threadpool.enqueue(&Worker::Run, worker); -// threads.push_back(std::thread(&Worker::Run, worker)); if (grp_size > 1 || nserver_grps > 0) { int nservers_per_grp = cluster->nservers_per_group(); @@ -241,8 +238,6 @@ void Driver::Train(const JobProto& job_conf) { stub.Run(slice2server, workers, servers); } -// for (auto& thread : threads) -// thread.join(); for (auto server : servers) delete server; delete net; From 3dbc22b1d22c2c04c6799229453adfbe79770271 Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Mon, 14 Mar 2016 12:21:24 +0800 Subject: [PATCH 13/18] Revert "ActivateDevice in Context has been removed because now we can always be sure that the did passed into SetupDevice is always correct." This reverts commit c84c087d4e60721b65f62b37b82f3b2050818d62. --- include/singa/utils/context.h | 11 +++++++++++ src/driver.cc | 5 +++++ 2 files changed, 16 insertions(+) diff --git a/include/singa/utils/context.h b/include/singa/utils/context.h index 2218a413bd..01cea6e1e6 100644 --- a/include/singa/utils/context.h +++ b/include/singa/utils/context.h @@ -140,6 +140,17 @@ class Context { #endif } + /** + * Activate the GPU device by calling cudaSetDevice. + * TODO: Deprecate? No longer needed. + */ + void ActivateDevice(const int device_id) { + CHECK_GE(device_id, 0); +#ifdef USE_GPU + cudaSetDevice(device_id); +#endif + } + /** * \copybrief rand_generator(const std::thread::id&); * @return the CPU random generator for the calling thread. diff --git a/src/driver.cc b/src/driver.cc index 3b463b255a..86bc8c338b 100644 --- a/src/driver.cc +++ b/src/driver.cc @@ -224,10 +224,13 @@ void Driver::Train(const JobProto& job_conf) { MPIQueues.push_back(make_shared()); #endif +// vector threads; for (auto server : servers) threadpool.enqueue(&Server::Run, server); +// threads.push_back(std::thread(&Server::Run, server)); for (auto worker : workers) threadpool.enqueue(&Worker::Run, worker); +// threads.push_back(std::thread(&Worker::Run, worker)); if (grp_size > 1 || nserver_grps > 0) { int nservers_per_grp = cluster->nservers_per_group(); @@ -238,6 +241,8 @@ void Driver::Train(const JobProto& job_conf) { stub.Run(slice2server, workers, servers); } +// for (auto& thread : threads) +// thread.join(); for (auto server : servers) delete server; delete net; From b3d00aa2d40e40f6c2acf45c603635927907b21f Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Tue, 15 Mar 2016 10:38:48 +0800 Subject: [PATCH 14/18] Replaced SetupDevice to chain call ActivateDevice. --- include/singa/utils/context.h | 13 ++++++++++--- include/singa/utils/threadpool.h | 30 +++++++++++++++++++++++++----- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/include/singa/utils/context.h b/include/singa/utils/context.h index 2218a413bd..1afb854c85 100644 --- a/include/singa/utils/context.h +++ b/include/singa/utils/context.h @@ -133,10 +133,17 @@ class Context { void SetupDevice(const std::thread::id& tid, const int did, const int seed) { device_id_[tid] = did; seed_[tid] = seed; + + ActivateDevice(did); + } + + /** + * Activate the GPU device by calling cudaSetDevice. + */ + void ActivateDevice(const int device_id) { + CHECK_GE(device_id, 0); #ifdef USE_GPU - if (did > -1) { - cudaSetDevice(did); - } + cudaSetDevice(device_id); #endif } diff --git a/include/singa/utils/threadpool.h b/include/singa/utils/threadpool.h index 9f1cf43259..362e303fbe 100644 --- a/include/singa/utils/threadpool.h +++ b/include/singa/utils/threadpool.h @@ -1,3 +1,24 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + #ifndef SINGA_THREADPOOL_H_ #define SINGA_THREADPOOL_H_ @@ -52,7 +73,6 @@ class Threadpool final { private: // Type of a task. using task_t = std::function; - // A list of worker threads. std::vector threads_; // A queue of tasks to be distributed and invoked. @@ -69,10 +89,10 @@ class Threadpool final { // State of the thread pool. bool stop; - Threadpool( const Threadpool & ) = delete; // Copy constructor disabled. - Threadpool( Threadpool && ) = delete; // Move constructor disabled. - Threadpool &operator=( const Threadpool & ) = delete; // Copy assignment operator disabled. - Threadpool &operator=( Threadpool && ) = delete; // Move assignment operator disabled. + Threadpool( const Threadpool & ) = delete; // Disabled copy constructor. + Threadpool( Threadpool && ) = delete; // Disabled move constructor. + Threadpool &operator=( const Threadpool & ) = delete; // Disabled copy assignment. + Threadpool &operator=( Threadpool && ) = delete; // Disabled move assignment. }; // Constructor From 04bc1402003e07e1634bbe582a119d4cac39494b Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Tue, 15 Mar 2016 10:41:38 +0800 Subject: [PATCH 15/18] Removed commented out dead code. --- src/driver.cc | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/driver.cc b/src/driver.cc index 86bc8c338b..3b463b255a 100644 --- a/src/driver.cc +++ b/src/driver.cc @@ -224,13 +224,10 @@ void Driver::Train(const JobProto& job_conf) { MPIQueues.push_back(make_shared()); #endif -// vector threads; for (auto server : servers) threadpool.enqueue(&Server::Run, server); -// threads.push_back(std::thread(&Server::Run, server)); for (auto worker : workers) threadpool.enqueue(&Worker::Run, worker); -// threads.push_back(std::thread(&Worker::Run, worker)); if (grp_size > 1 || nserver_grps > 0) { int nservers_per_grp = cluster->nservers_per_group(); @@ -241,8 +238,6 @@ void Driver::Train(const JobProto& job_conf) { stub.Run(slice2server, workers, servers); } -// for (auto& thread : threads) -// thread.join(); for (auto server : servers) delete server; delete net; From b81bd525836cf69e1ce2db15bec344a690592d40 Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Tue, 15 Mar 2016 23:13:54 +0800 Subject: [PATCH 16/18] Transplanted worker setup logic from Driver to Worker. Errors exist. --- include/singa/driver.h | 3 +- include/singa/utils/context.h | 14 +------ include/singa/worker.h | 30 ++++++--------- src/driver.cc | 40 +++++++++++++++++--- src/stub.cc | 2 +- src/worker.cc | 71 ++++++++++++++++++++++++++--------- 6 files changed, 104 insertions(+), 56 deletions(-) diff --git a/include/singa/driver.h b/include/singa/driver.h index 797beb23d1..f0934e403d 100644 --- a/include/singa/driver.h +++ b/include/singa/driver.h @@ -125,7 +125,8 @@ class Driver { * @param[in] net training neural network. * @return worker instances */ - const vector CreateWorkers(const JobProto& job_conf, NeuralNet* net); +// const vector CreateWorkers(const JobProto& job_conf, NeuralNet* net); + const vector CreateWorkers(const JobProto& job_conf); /*********** Subclasses registers *************************/ diff --git a/include/singa/utils/context.h b/include/singa/utils/context.h index 9abbce909e..0fac1556d1 100644 --- a/include/singa/utils/context.h +++ b/include/singa/utils/context.h @@ -134,7 +134,8 @@ class Context { device_id_[tid] = did; seed_[tid] = seed; - ActivateDevice(did); + if (did > -1 ) + ActivateDevice(did); } /** @@ -147,17 +148,6 @@ class Context { #endif } - /** - * Activate the GPU device by calling cudaSetDevice. - * TODO: Deprecate? No longer needed. - */ - void ActivateDevice(const int device_id) { - CHECK_GE(device_id, 0); -#ifdef USE_GPU - cudaSetDevice(device_id); -#endif - } - /** * \copybrief rand_generator(const std::thread::id&); * @return the CPU random generator for the calling thread. diff --git a/include/singa/worker.h b/include/singa/worker.h index da7e9c0030..6f75a10dfe 100644 --- a/include/singa/worker.h +++ b/include/singa/worker.h @@ -58,20 +58,13 @@ class Worker { * -# Back-propagation for the feed-forward models, e.g., CNN and MLP, and the * recurrent neural networks. * -# Contrastive divergence for the energy models, e.g., RBM. - * + * TODO: Update this when it works. * @return a pointer to the instance of the Worker subclass. */ - static Worker* Create(const AlgProto& conf); - /** - * Create an instance of the subclass of Worker - * with the specified device type. - */ - static Worker* Create(const AlgProto& conf, int devid); + static Worker* Create(const AlgProto& conf, int grp_id, int dev_id, int dev_type); virtual ~Worker(); /** - * @param[in] grp_id global worker group ID - * @param[in] id worker ID within the group * @param[in] conf job configuration * @param[in] train_net pointer to the training neural net, which could be * shared with other workers from the same group. Different workers run over @@ -83,8 +76,7 @@ class Worker { * first worker from the first group would have test neural net. All other * workers receive nullptr for this argument. */ - virtual void Setup(int grp_id, int id, const JobProto& conf, - NeuralNet* train_net, NeuralNet* val_net, NeuralNet* test_net); + virtual void Setup(const JobProto& conf); /** * Main function of Worker. * @@ -267,28 +259,30 @@ class Worker { /** * @return group ID */ - inline int grp_id() const { return grp_id_; } + inline int grp_id() const { return this->grp_id_; } + inline void grp_id(int grp_id) { this->grp_id_ = grp_id; } /** * @return worker ID within the worker group. */ - inline int id() const { return id_; } + inline int dev_id() const { return this->id_; } + inline void dev_id(int id) { this->id_ = id; } /** - * @return The device ID of this worker. It is -1 for CPU and >-1 for GPU. + * @return The device type of this worker. It is -1 for CPU and >-1 for GPU. */ - inline int device_id() { return device_id_; } + inline int dev_type() const { return this->device_type_; } /** - * @param devid The ID value to assign to this worker. + * @param devid The type value to assign to this worker. */ - inline void device_id(int devid) { device_id_ = devid; } + inline void dev_type(int dev_type) { this->device_type_ = dev_type; } protected: int grp_id_ = -1, id_ = -1; int step_ = 0; bool execute_; JobProto job_conf_; - int device_id_; + int device_type_; NeuralNet* train_net_ = nullptr; NeuralNet* test_net_ = nullptr; NeuralNet* val_net_ = nullptr; diff --git a/src/driver.cc b/src/driver.cc index 3b463b255a..f30b65800e 100644 --- a/src/driver.cc +++ b/src/driver.cc @@ -215,7 +215,8 @@ void Driver::Train(const JobProto& job_conf) { NeuralNet* net = NeuralNet::Create(job_conf.neuralnet(), kTrain, grp_size); WriteStringToTextFile(cluster->vis_folder() + "/train_net.json", net->ToGraph(true).ToJson()); - const vector workers = CreateWorkers(job_conf, net); +// const vector workers = CreateWorkers(job_conf, net); + const vector workers = CreateWorkers(job_conf); const vector servers = CreateServers(job_conf, net); #ifdef USE_MPI @@ -224,6 +225,11 @@ void Driver::Train(const JobProto& job_conf) { MPIQueues.push_back(make_shared()); #endif + // Setup the workers + for (auto worker: workers) + worker->Setup(job_conf); + + // Actual running for (auto server : servers) threadpool.enqueue(&Server::Run, server); for (auto worker : workers) @@ -266,8 +272,8 @@ void Driver::Test(const JobProto& job_conf) { Cluster::Get()->Register(getpid(), "localhost"); // TODO(wangwei) extend to a group with multiple workers - auto worker = Worker::Create(job_conf.train_one_batch()); - worker->Setup(0, 0, job_conf, nullptr, nullptr, nullptr); + auto worker = Worker::Create(job_conf.train_one_batch(), 0, 0, -1); + worker->Setup(job_conf); auto net = NeuralNet::Create(job_conf.neuralnet(), kTest, 1); WriteStringToTextFile(Cluster::Get()->vis_folder() + "/test_net.json", @@ -319,6 +325,7 @@ void Driver::SetupForResume(JobProto* job_conf) { tinydir_close(&dir); } +/* const vector Driver::CreateWorkers(const JobProto& job_conf, NeuralNet* net) { auto cluster = Cluster::Get(); @@ -363,14 +370,35 @@ const vector Driver::CreateWorkers(const JobProto& job_conf, for (int wid = wstart; wid < wend; wid++) { // Set DeviceType to CPU or GPU here. - auto *worker = Worker::Create(job_conf.train_one_batch(), + auto *worker = Worker::Create(job_conf.train_one_batch(), gid, wid, (ngpu_per_group > 0) ? job_conf.gpu(ngpu_per_group-1) : -1); ngpu_per_group--; // TODO(wangwei) extend to test among workers in a grp if (wid == 0) - worker->Setup(gid, wid, job_conf, train_net, val_net, test_net); + worker->Setup(job_conf, train_net, val_net, test_net); else - worker->Setup(gid, wid, job_conf, train_net, nullptr, nullptr); + worker->Setup(job_conf, train_net, nullptr, nullptr); + workers.push_back(worker); + } + } + return workers; +}*/ + +const vector Driver::CreateWorkers(const JobProto& job_conf) { + auto cluster = Cluster::Get(); + vector workers; + int ngpu_per_group = cluster->ngpu_per_group(); + + const vector rng = cluster->ExecutorRng(cluster->procs_id(), + cluster->nworkers_per_group(), cluster->nworkers_per_procs()); + int gstart = rng[0], gend = rng[1], wstart = rng[2], wend = rng[3]; + + for (int gid = gstart; gid < gend; gid++) { + for (int wid = wstart; wid < wend; wid++) { + auto *worker = Worker::Create(job_conf.train_one_batch(), gid, wid, + (ngpu_per_group > 0) ? job_conf.gpu(ngpu_per_group-1) : -1); + + ngpu_per_group--; workers.push_back(worker); } } diff --git a/src/stub.cc b/src/stub.cc index c06128c2a0..9a138c3b2e 100644 --- a/src/stub.cc +++ b/src/stub.cc @@ -66,7 +66,7 @@ const std::unordered_map CreateParamShard( // grp id -> worker id range std::unordered_map> grp2workers; for (auto worker : workers) { - int grp = worker->grp_id(), id = worker->id(); + int grp = worker->grp_id(), id = worker->dev_id(); if (grp2net.find(grp) == grp2net.end()) { grp2net[grp] = worker->train_net(); grp2workers[grp] = std::make_pair(id, id + 1); diff --git a/src/worker.cc b/src/worker.cc index bb9d1ef1c6..b3221178b8 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -33,30 +33,65 @@ namespace singa { using std::string; -Worker* Worker::Create(const AlgProto& conf) { - return Create(conf, -1); -} - -Worker* Worker::Create(const AlgProto& conf, int devid) { +Worker* Worker::Create(const AlgProto& conf, int grp_id, int dev_id, int dev_type) { auto factory = Singleton>::Instance(); Worker* worker = nullptr; if (conf.has_user_alg()) worker = factory->Create(conf.user_alg()); else worker = factory->Create(conf.alg()); - worker->device_id(devid); + worker->grp_id(grp_id); + worker->dev_id(dev_id); + worker->dev_type(dev_type); return worker; } -void Worker::Setup(int grp_id, int id, const JobProto& conf, - NeuralNet* train_net, NeuralNet* val_net, NeuralNet* test_net) { - grp_id_ = grp_id; - id_ = id; - job_conf_ = conf; - train_net_ = train_net; - val_net_ = val_net; - test_net_ = test_net; - bridge_dealer_ = dealer_ = nullptr; +void Worker::Setup(const JobProto& job_conf) { + + LOG(ERROR) << "Setting up worker."; + + auto cluster = Cluster::Get(); + auto context = Singleton::Instance(); + context->SetupDevice(std::this_thread::get_id(), this->device_type_); + + int grp_size = cluster->nworkers_per_group(); + int nservers_per_grp = cluster->nservers_per_group(); + int nserver_grps = cluster->nserver_groups(); + + this->train_net_ = NeuralNet::Create(job_conf.neuralnet(), kTrain, grp_size); + this->test_net_ = nullptr; + this->val_net_ = nullptr; + + int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp); + + const vector rng = cluster->ExecutorRng(cluster->procs_id(), + cluster->nworkers_per_group(), cluster->nworkers_per_procs()); + + if (this->grp_id_ == rng[0]) { + Param::SliceParams(lcm, this->train_net_->params()); + + if (this->grp_id_ == 0 && this->id_ == 0) { + if (job_conf.test_steps() > 0) { + this->test_net_ = NeuralNet::Create(job_conf.neuralnet(), kTest, 1); + this->test_net_->ShareParamsFrom(this->train_net_, false); + } + + if (job_conf.validate_steps() > 0) { + this->val_net_ = NeuralNet::Create(job_conf.neuralnet(), kVal, 1); + this->val_net_->ShareParamsFrom(this->train_net_, false); + } + } + } else { + if (cluster->share_memory()) { + this->train_net_->ShareParamsFrom(this->train_net_, true); + } else { + Param::SliceParams(lcm, this->train_net_->params()); + } + } + + this->job_conf_ = job_conf; + this->bridge_dealer_ = dealer_ = nullptr; + LOG(ERROR) << "Done setting up worker."; } Worker::~Worker() { @@ -65,11 +100,11 @@ Worker::~Worker() { } void Worker::Run() { - auto context = Singleton::Instance(); - context->SetupDevice(std::this_thread::get_id(), this->device_id_); + //auto context = Singleton::Instance(); + //context->SetupDevice(std::this_thread::get_id(), this->device_id_); LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") " - << " start on " << (device_id_ >= 0 ? "GPU " + std::to_string(device_id_) : "CPU"); + << " start on " << (device_type_ >= 0 ? "GPU " + std::to_string(device_type_) : "CPU"); auto cluster = Cluster::Get(); int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group(); From 1acee36bdf8751a085168570a43fdc576e2e8c71 Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Wed, 23 Mar 2016 00:00:27 +0800 Subject: [PATCH 17/18] Removed useless conf files. --- examples/mnist/2x2.conf | 261 ------------------------------- examples/mnist/hybrid.conf | 256 ------------------------------ examples/mnist/hybrid_gpux2.conf | 258 ------------------------------ src/worker.cc | 4 +- 4 files changed, 2 insertions(+), 777 deletions(-) delete mode 100644 examples/mnist/2x2.conf delete mode 100644 examples/mnist/hybrid.conf delete mode 100644 examples/mnist/hybrid_gpux2.conf diff --git a/examples/mnist/2x2.conf b/examples/mnist/2x2.conf deleted file mode 100644 index d0da46ad3c..0000000000 --- a/examples/mnist/2x2.conf +++ /dev/null @@ -1,261 +0,0 @@ -name: "mlp" -train_steps: 1000 -test_steps:0 -test_freq:500 -disp_freq:100 -gpu:0 -gpu:1 - -train_one_batch { - alg: kBP -} - -updater{ - type: kSGD - learning_rate{ - type : kStep - base_lr: 0.001 - step_conf{ - change_freq: 60 - gamma: 0.997 - } - } -} - -neuralnet { - layer { - name: "data" - type: kRecordInput - store_conf { - backend: "kvfile" - path: "examples/mnist/train_data.bin" - random_skip: 5000 - batchsize: 64 - shape: 784 - std_value: 127.5 - mean_value: 127.5 - ratio: 5 - } - include: kTrain - } - - layer { - name: "data" - type: kRecordInput - store_conf { - backend: "kvfile" - path: "examples/mnist/test_data.bin" - batchsize: 100 - shape: 784 - std_value: 127.5 - mean_value: 127.5 - } - include: kTest - } - - layer{ - name: "fc1" - partition_dim: 0 - type: kInnerProduct - srclayers:"data" - innerproduct_conf{ - num_output: 2500 - } - param{ - name: "w1" - init { - type: kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b1" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - } - - layer{ - name: "tanh1" - partition_dim: 0 - type: kSTanh - srclayers:"fc1" - } - layer{ - name: "fc2" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh1" - innerproduct_conf{ - num_output: 2000 - } - param{ - name: "w2" - init { - type: kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b2" - init { - type: kUniform - low: -0.05 - high:0.05 - } - } - } - - layer{ - name: "tanh2" - partition_dim: 0 - type: kSTanh - srclayers:"fc2" - } - layer{ - name: "fc3" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh2" - innerproduct_conf{ - num_output: 1500 - } - param{ - name: "w3" - init{ - type: kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b3" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - - } - - layer{ - name: "tanh3" - partition_dim: 0 - type: kSTanh - srclayers:"fc3" - } - layer{ - name: "fc4" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh3" - innerproduct_conf{ - num_output: 1000 - } - param{ - name: "w4" - init { - type : kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b4" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - - } - - layer{ - name: "tanh4" - partition_dim: 0 - type: kSTanh - srclayers:"fc4" - } - layer{ - name: "fc5" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh4" - innerproduct_conf{ - num_output: 500 - } - param{ - name: "w5" - init { - type : kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b5" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - } - - layer{ - name: "tanh5" - partition_dim: 0 - type: kSTanh - srclayers:"fc5" - } - layer{ - name: "fc6" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh5" - innerproduct_conf{ - num_output: 10 - } - param{ - name: "w6" - init { - type : kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b6" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - } - layer{ - name: "loss" - type:kSoftmaxLoss - softmaxloss_conf{ - topk:1 - } - srclayers:"fc6" - srclayers:"data" - } -} -cluster { - nworker_groups: 1 - nserver_groups: 1 - nworkers_per_group: 4 - nworkers_per_procs: 4 - ngpu_per_group:2 - workspace: "examples/mnist" -} - diff --git a/examples/mnist/hybrid.conf b/examples/mnist/hybrid.conf deleted file mode 100644 index 37bfbdab1b..0000000000 --- a/examples/mnist/hybrid.conf +++ /dev/null @@ -1,256 +0,0 @@ -name: "mlp" -train_steps: 1000 -test_steps:0 -test_freq:500 -disp_freq:100 - -train_one_batch { - alg: kBP -} - -updater{ - type: kSGD - learning_rate{ - type : kStep - base_lr: 0.001 - step_conf{ - change_freq: 60 - gamma: 0.997 - } - } -} - -neuralnet { - layer { - name: "data" - type: kRecordInput - store_conf { - backend: "kvfile" - path: "examples/mnist/train_data.bin" - random_skip: 5000 - batchsize: 64 - shape: 784 - std_value: 127.5 - mean_value: 127.5 - } - include: kTrain - } - - layer { - name: "data" - type: kRecordInput - store_conf { - backend: "kvfile" - path: "examples/mnist/test_data.bin" - batchsize: 100 - shape: 784 - std_value: 127.5 - mean_value: 127.5 - } - include: kTest - } - - layer{ - name: "fc1" - partition_dim: 0 - type: kInnerProduct - srclayers:"data" - innerproduct_conf{ - num_output: 2500 - } - param{ - name: "w1" - init { - type: kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b1" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - } - - layer{ - name: "tanh1" - partition_dim: 0 - type: kSTanh - srclayers:"fc1" - } - layer{ - name: "fc2" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh1" - innerproduct_conf{ - num_output: 2000 - } - param{ - name: "w2" - init { - type: kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b2" - init { - type: kUniform - low: -0.05 - high:0.05 - } - } - } - - layer{ - name: "tanh2" - partition_dim: 0 - type: kSTanh - srclayers:"fc2" - } - layer{ - name: "fc3" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh2" - innerproduct_conf{ - num_output: 1500 - } - param{ - name: "w3" - init{ - type: kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b3" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - - } - - layer{ - name: "tanh3" - partition_dim: 0 - type: kSTanh - srclayers:"fc3" - } - layer{ - name: "fc4" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh3" - innerproduct_conf{ - num_output: 1000 - } - param{ - name: "w4" - init { - type : kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b4" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - - } - - layer{ - name: "tanh4" - partition_dim: 0 - type: kSTanh - srclayers:"fc4" - } - layer{ - name: "fc5" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh4" - innerproduct_conf{ - num_output: 500 - } - param{ - name: "w5" - init { - type : kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b5" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - } - - layer{ - name: "tanh5" - partition_dim: 0 - type: kSTanh - srclayers:"fc5" - } - layer{ - name: "fc6" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh5" - innerproduct_conf{ - num_output: 10 - } - param{ - name: "w6" - init { - type : kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b6" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - } - layer{ - name: "loss" - type:kSoftmaxLoss - softmaxloss_conf{ - topk:1 - } - srclayers:"fc6" - srclayers:"data" - } -} -cluster { - nworker_groups: 1 - nserver_groups: 1 - nworkers_per_group: 2 - nworkers_per_procs: 2 - workspace: "examples/mnist" -} diff --git a/examples/mnist/hybrid_gpux2.conf b/examples/mnist/hybrid_gpux2.conf deleted file mode 100644 index 692e4222dd..0000000000 --- a/examples/mnist/hybrid_gpux2.conf +++ /dev/null @@ -1,258 +0,0 @@ -name: "mlp" -train_steps: 1000 -test_steps:0 -test_freq:500 -disp_freq:100 -gpu:0 -gpu:1 - -train_one_batch { - alg: kBP -} - -updater{ - type: kSGD - learning_rate{ - type : kStep - base_lr: 0.001 - step_conf{ - change_freq: 60 - gamma: 0.997 - } - } -} - -neuralnet { - layer { - name: "data" - type: kRecordInput - store_conf { - backend: "kvfile" - path: "examples/mnist/train_data.bin" - random_skip: 5000 - batchsize: 64 - shape: 784 - std_value: 127.5 - mean_value: 127.5 - } - include: kTrain - } - - layer { - name: "data" - type: kRecordInput - store_conf { - backend: "kvfile" - path: "examples/mnist/test_data.bin" - batchsize: 100 - shape: 784 - std_value: 127.5 - mean_value: 127.5 - } - include: kTest - } - - layer{ - name: "fc1" - partition_dim: 0 - type: kInnerProduct - srclayers:"data" - innerproduct_conf{ - num_output: 2500 - } - param{ - name: "w1" - init { - type: kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b1" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - } - - layer{ - name: "tanh1" - partition_dim: 0 - type: kSTanh - srclayers:"fc1" - } - layer{ - name: "fc2" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh1" - innerproduct_conf{ - num_output: 2000 - } - param{ - name: "w2" - init { - type: kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b2" - init { - type: kUniform - low: -0.05 - high:0.05 - } - } - } - - layer{ - name: "tanh2" - partition_dim: 0 - type: kSTanh - srclayers:"fc2" - } - layer{ - name: "fc3" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh2" - innerproduct_conf{ - num_output: 1500 - } - param{ - name: "w3" - init{ - type: kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b3" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - - } - - layer{ - name: "tanh3" - partition_dim: 0 - type: kSTanh - srclayers:"fc3" - } - layer{ - name: "fc4" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh3" - innerproduct_conf{ - num_output: 1000 - } - param{ - name: "w4" - init { - type : kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b4" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - - } - - layer{ - name: "tanh4" - partition_dim: 0 - type: kSTanh - srclayers:"fc4" - } - layer{ - name: "fc5" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh4" - innerproduct_conf{ - num_output: 500 - } - param{ - name: "w5" - init { - type : kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b5" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - } - - layer{ - name: "tanh5" - partition_dim: 0 - type: kSTanh - srclayers:"fc5" - } - layer{ - name: "fc6" - partition_dim: 0 - type: kInnerProduct - srclayers:"tanh5" - innerproduct_conf{ - num_output: 10 - } - param{ - name: "w6" - init { - type : kUniform - low:-0.05 - high:0.05 - } - } - param{ - name: "b6" - init { - type : kUniform - low: -0.05 - high:0.05 - } - } - } - layer{ - name: "loss" - type:kSoftmaxLoss - softmaxloss_conf{ - topk:1 - } - srclayers:"fc6" - srclayers:"data" - } -} -cluster { - nworker_groups: 1 - nserver_groups: 1 - nworkers_per_group: 2 - nworkers_per_procs: 2 - workspace: "examples/mnist" -} diff --git a/src/worker.cc b/src/worker.cc index b3221178b8..e7a3a9dbea 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -100,8 +100,8 @@ Worker::~Worker() { } void Worker::Run() { - //auto context = Singleton::Instance(); - //context->SetupDevice(std::this_thread::get_id(), this->device_id_); + auto context = Singleton::Instance(); + context->SetupDevice(std::this_thread::get_id(), this->dev_type()); LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") " << " start on " << (device_type_ >= 0 ? "GPU " + std::to_string(device_type_) : "CPU"); From f22b24d5bc56eb6f844d8542301a2fe03a2e53a9 Mon Sep 17 00:00:00 2001 From: Tan Li Boon Date: Wed, 23 Mar 2016 01:33:56 +0800 Subject: [PATCH 18/18] Work for the night, going to try another strategy. --- examples/mnist/hybrid_gpu.conf | 5 +++-- include/singa/utils/context.h | 3 --- src/worker.cc | 12 ++++++------ 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/examples/mnist/hybrid_gpu.conf b/examples/mnist/hybrid_gpu.conf index ead54e9849..e11a95895d 100644 --- a/examples/mnist/hybrid_gpu.conf +++ b/examples/mnist/hybrid_gpu.conf @@ -251,7 +251,8 @@ neuralnet { cluster { nworker_groups: 1 nserver_groups: 1 - nworkers_per_group: 2 - nworkers_per_procs: 2 + nworkers_per_group: 3 + nworkers_per_procs: 3 + ngpu_per_group: 1 workspace: "examples/mnist" } diff --git a/include/singa/utils/context.h b/include/singa/utils/context.h index 0fac1556d1..2afedb3a60 100644 --- a/include/singa/utils/context.h +++ b/include/singa/utils/context.h @@ -133,9 +133,6 @@ class Context { void SetupDevice(const std::thread::id& tid, const int did, const int seed) { device_id_[tid] = did; seed_[tid] = seed; - - if (did > -1 ) - ActivateDevice(did); } /** diff --git a/src/worker.cc b/src/worker.cc index e7a3a9dbea..b5fe8cb61f 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -48,11 +48,7 @@ Worker* Worker::Create(const AlgProto& conf, int grp_id, int dev_id, int dev_typ void Worker::Setup(const JobProto& job_conf) { - LOG(ERROR) << "Setting up worker."; - auto cluster = Cluster::Get(); - auto context = Singleton::Instance(); - context->SetupDevice(std::this_thread::get_id(), this->device_type_); int grp_size = cluster->nworkers_per_group(); int nservers_per_grp = cluster->nservers_per_group(); @@ -91,7 +87,6 @@ void Worker::Setup(const JobProto& job_conf) { this->job_conf_ = job_conf; this->bridge_dealer_ = dealer_ = nullptr; - LOG(ERROR) << "Done setting up worker."; } Worker::~Worker() { @@ -101,7 +96,12 @@ Worker::~Worker() { void Worker::Run() { auto context = Singleton::Instance(); - context->SetupDevice(std::this_thread::get_id(), this->dev_type()); + LOG(ERROR) << "Thread ID: " << std::this_thread::get_id() << " Type: " << this->dev_type() + << " Map: " << context->device_id(std::this_thread::get_id()); + if (this->dev_type() > -1) { + LOG(ERROR) << "Activating device."; + context->ActivateDevice(this->dev_type()); + } LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") " << " start on " << (device_type_ >= 0 ? "GPU " + std::to_string(device_type_) : "CPU");