diff --git a/examples/cifar10/cudnn_hybrid.conf b/examples/cifar10/cudnn_hybrid.conf index a11145c170..3a60a96697 100644 --- a/examples/cifar10/cudnn_hybrid.conf +++ b/examples/cifar10/cudnn_hybrid.conf @@ -1,10 +1,10 @@ name: "cifar10-convnet" -train_steps: 10000 +train_steps: 1000 test_steps: 0 test_freq: 200 #validate_steps: 100 #validate_freq: 300 -disp_freq: 200 +disp_freq: 50 gpu: 0 gpu: 1 #debug: true @@ -43,7 +43,6 @@ neuralnet { shape: 32 } include: kTrain - partition_dim: 0 } # layer{ # name: "data" @@ -73,7 +72,6 @@ neuralnet { shape: 32 } include: kTest - partition_dim: 0 } layer { diff --git a/examples/cifar10/gpu.conf b/examples/cifar10/gpu.conf new file mode 100644 index 0000000000..6dbf72d6cb --- /dev/null +++ b/examples/cifar10/gpu.conf @@ -0,0 +1,282 @@ +name: "cifar10-convnet" +train_steps: 1000 +test_steps: 100 +test_freq: 200 +#validate_steps: 100 +#validate_freq: 300 +disp_freq: 50 +gpu:0 + + +#checkpoint_path: "examples/cifar10/checkpoint/step1000-worker0" +train_one_batch { + alg: kBP +} +updater{ + type: kSGD + weight_decay:0.004 + momentum:0.9 + learning_rate { + type: kFixedStep + fixedstep_conf:{ + step:0 + step:60000 + step:65000 + step_lr:0.001 + step_lr:0.0001 + step_lr:0.00001 + } + } +} +neuralnet { + layer{ + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/cifar10/train_data.bin" + mean_file: "examples/cifar10/image_mean.bin" + batchsize: 100 + #random_skip: 5000 + shape: 3 + shape: 32 + shape: 32 + } + include: kTrain + } +# layer{ +# name: "data" +# type: kRecordInput +# store_conf { +# backend: "kvfile" +# path: "examples/cifar10/val_data.bin" +# mean_file: "examples/cifar10/image_mean.bin" +# batchsize: 64 +# random_skip: 5000 +# shape: 3 +# shape: 32 +# shape: 32 +# } +# include: kVal +# } + layer{ + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/cifar10/test_data.bin" + mean_file: "examples/cifar10/image_mean.bin" + batchsize: 100 + shape: 3 + shape: 32 + shape: 32 + } + include: kTest + } + + layer { + name: "conv1" + type: kCConvolution + srclayers: "data" + convolution_conf { + num_filters: 32 + kernel: 5 + stride: 1 + pad:2 + } + param { + name: "w1" + init { + type:kGaussian + std:0.0001 + } + } + param { + name: "b1" + lr_scale:2.0 + init { + type: kConstant + value:0 + } + } + } + + layer { + name: "pool1" + type: kCPooling + srclayers: "conv1" + pooling_conf { + pool: MAX + kernel: 3 + stride: 2 + } + } + layer { + name: "relu1" + type: kReLU + srclayers:"pool1" + } + layer { + name: "norm1" + type: kLRN + lrn_conf { + local_size: 3 + alpha: 5e-05 + beta: 0.75 + } + srclayers:"relu1" + } + layer { + name: "conv2" + type: kCConvolution + srclayers: "norm1" + convolution_conf { + num_filters: 32 + kernel: 5 + stride: 1 + pad:2 + } + param { + name: "w2" + init { + type:kGaussian + std:0.01 + } + } + param { + name: "b2" + lr_scale:2.0 + init { + type: kConstant + value:0 + } + } + } + layer { + name: "relu2" + type: kReLU + srclayers:"conv2" + } + layer { + name: "pool2" + type: kCPooling + srclayers: "relu2" + pooling_conf { + pool: AVG + kernel: 3 + stride: 2 + } + } + layer { + name: "norm2" + type: kLRN + lrn_conf { + local_size: 3 + alpha: 5e-05 + beta: 0.75 + } + srclayers:"pool2" + } + layer { + name: "conv3" + type: kCConvolution + srclayers: "norm2" + convolution_conf { + num_filters: 64 + kernel: 5 + stride: 1 + pad:2 + } + param { + name: "w3" + init { + type:kGaussian + std:0.01 + } + } + param { + name: "b3" + init { + type: kConstant + value:0 + } + } + } + layer { + name: "relu3" + type: kReLU + srclayers:"conv3" + } + layer { + name: "pool3" + type: kCPooling + srclayers: "relu3" + pooling_conf { + pool: AVG + kernel: 3 + stride: 2 + } + } + layer { + name: "ip1" + type: kInnerProduct + srclayers:"pool3" + innerproduct_conf { + num_output: 10 + } + param { + name: "w4" + wd_scale:250 + init { + type:kGaussian + std:0.01 + } + } + param { + name: "b4" + lr_scale:2.0 + wd_scale:0 + init { + type: kConstant + value:0 + } + } + } +# layer { +# name : "softmax" +# type: kSoftmax +# srclayers: "ip1" +# } +# +# layer { +# name : "argsort" +# type: kArgSort +# srclayers: "softmax" +# } + layer{ + name: "loss" + type: kSoftmaxLoss + softmaxloss_conf{ + topk:1 + } + srclayers:"ip1" + srclayers: "data" + } +# uncomment "softmax", "argsort", "output" layer and comment "loss" layer +# to extract features from argsort +# layer { +# name : "output" +# type: kCSVOutput +# srclayers: "argsort" +# store_conf { +# path: "examples/cifar10/out.csv" +# } +# } +} +cluster { + nworker_groups: 1 + nserver_groups: 1 + nworkers_per_group: 1 + nworkers_per_procs: 1 + workspace: "examples/cifar10" +} diff --git a/examples/cifar10/hybrid.conf b/examples/cifar10/hybrid.conf index ec3da0cda7..19b1dc54b0 100644 --- a/examples/cifar10/hybrid.conf +++ b/examples/cifar10/hybrid.conf @@ -4,7 +4,9 @@ test_steps: 0 test_freq: 200 #validate_steps: 100 #validate_freq: 300 -disp_freq: 30 +disp_freq: 50 + + #debug: true #checkpoint_path: "examples/cifar10/checkpoint/step1000-worker0" train_one_batch { diff --git a/examples/cifar10/job.conf b/examples/cifar10/job.conf index d20b452e34..433a5f7cc7 100644 --- a/examples/cifar10/job.conf +++ b/examples/cifar10/job.conf @@ -5,6 +5,9 @@ test_freq: 200 #validate_steps: 100 #validate_freq: 300 disp_freq: 50 + + + #checkpoint_path: "examples/cifar10/checkpoint/step1000-worker0" train_one_batch { alg: kBP diff --git a/examples/mnist/gpu.conf b/examples/mnist/gpu.conf new file mode 100644 index 0000000000..41777a5aef --- /dev/null +++ b/examples/mnist/gpu.conf @@ -0,0 +1,243 @@ +name: "mlp" +train_steps: 1000 +test_steps:10 +test_freq:500 +disp_freq:100 +gpu:0 + +train_one_batch { + alg: kBP +} +updater{ + type: kSGD + learning_rate{ + type : kStep + base_lr: 0.001 + step_conf{ + change_freq: 60 + gamma: 0.997 + } + } +} + +neuralnet { + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/train_data.bin" + random_skip: 5000 + batchsize: 64 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + } + include: kTrain + } + + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/test_data.bin" + batchsize: 100 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + } + include: kTest + } + + layer{ + name: "fc1" + type: kInnerProduct + srclayers:"data" + innerproduct_conf{ + num_output: 2500 + } + param{ + name: "w1" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b1" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh1" + type: kSTanh + srclayers:"fc1" + } + layer{ + name: "fc2" + type: kInnerProduct + srclayers:"tanh1" + innerproduct_conf{ + num_output: 2000 + } + param{ + name: "w2" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b2" + init { + type: kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh2" + type: kSTanh + srclayers:"fc2" + } + layer{ + name: "fc3" + type: kInnerProduct + srclayers:"tanh2" + innerproduct_conf{ + num_output: 1500 + } + param{ + name: "w3" + init{ + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b3" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh3" + type: kSTanh + srclayers:"fc3" + } + layer{ + name: "fc4" + type: kInnerProduct + srclayers:"tanh3" + innerproduct_conf{ + num_output: 1000 + } + param{ + name: "w4" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b4" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh4" + type: kSTanh + srclayers:"fc4" + } + layer{ + name: "fc5" + type: kInnerProduct + srclayers:"tanh4" + innerproduct_conf{ + num_output: 500 + } + param{ + name: "w5" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b5" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh5" + type: kSTanh + srclayers:"fc5" + } + layer{ + name: "fc6" + type: kInnerProduct + srclayers:"tanh5" + innerproduct_conf{ + num_output: 10 + } + param{ + name: "w6" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b6" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + layer{ + name: "loss" + type:kSoftmaxLoss + softmaxloss_conf{ + topk:1 + } + srclayers:"fc6" + srclayers:"data" + } +} +cluster { + nworker_groups: 1 + nserver_groups: 1 + workspace: "examples/mnist" +} diff --git a/examples/mnist/hybrid_gpu.conf b/examples/mnist/hybrid_gpu.conf new file mode 100644 index 0000000000..e11a95895d --- /dev/null +++ b/examples/mnist/hybrid_gpu.conf @@ -0,0 +1,258 @@ +name: "mlp" +train_steps: 1000 +test_steps:0 +test_freq:500 +disp_freq:100 +gpu:0 + +train_one_batch { + alg: kBP +} + +updater{ + type: kSGD + learning_rate{ + type : kStep + base_lr: 0.001 + step_conf{ + change_freq: 60 + gamma: 0.997 + } + } +} + +neuralnet { + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/train_data.bin" + random_skip: 5000 + batchsize: 64 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + } + include: kTrain + } + + layer { + name: "data" + type: kRecordInput + store_conf { + backend: "kvfile" + path: "examples/mnist/test_data.bin" + batchsize: 100 + shape: 784 + std_value: 127.5 + mean_value: 127.5 + } + include: kTest + } + + layer{ + name: "fc1" + partition_dim: 0 + type: kInnerProduct + srclayers:"data" + innerproduct_conf{ + num_output: 2500 + } + param{ + name: "w1" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b1" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh1" + partition_dim: 0 + type: kSTanh + srclayers:"fc1" + } + layer{ + name: "fc2" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh1" + innerproduct_conf{ + num_output: 2000 + } + param{ + name: "w2" + init { + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b2" + init { + type: kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh2" + partition_dim: 0 + type: kSTanh + srclayers:"fc2" + } + layer{ + name: "fc3" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh2" + innerproduct_conf{ + num_output: 1500 + } + param{ + name: "w3" + init{ + type: kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b3" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh3" + partition_dim: 0 + type: kSTanh + srclayers:"fc3" + } + layer{ + name: "fc4" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh3" + innerproduct_conf{ + num_output: 1000 + } + param{ + name: "w4" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b4" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + + } + + layer{ + name: "tanh4" + partition_dim: 0 + type: kSTanh + srclayers:"fc4" + } + layer{ + name: "fc5" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh4" + innerproduct_conf{ + num_output: 500 + } + param{ + name: "w5" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b5" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + + layer{ + name: "tanh5" + partition_dim: 0 + type: kSTanh + srclayers:"fc5" + } + layer{ + name: "fc6" + partition_dim: 0 + type: kInnerProduct + srclayers:"tanh5" + innerproduct_conf{ + num_output: 10 + } + param{ + name: "w6" + init { + type : kUniform + low:-0.05 + high:0.05 + } + } + param{ + name: "b6" + init { + type : kUniform + low: -0.05 + high:0.05 + } + } + } + layer{ + name: "loss" + type:kSoftmaxLoss + softmaxloss_conf{ + topk:1 + } + srclayers:"fc6" + srclayers:"data" + } +} +cluster { + nworker_groups: 1 + nserver_groups: 1 + nworkers_per_group: 3 + nworkers_per_procs: 3 + ngpu_per_group: 1 + workspace: "examples/mnist" +} diff --git a/examples/mnist/job.conf b/examples/mnist/job.conf index 41d6b6fdb5..decd7bcf2a 100644 --- a/examples/mnist/job.conf +++ b/examples/mnist/job.conf @@ -1,8 +1,10 @@ name: "mlp" train_steps: 1000 test_steps:10 -test_freq:60 -disp_freq:10 +test_freq:500 +disp_freq:100 + + train_one_batch { alg: kBP } diff --git a/examples/rnnlm/rnnlm.proto b/examples/rnnlm/rnnlm.proto index 8cfec8682b..4a4dcbc17d 100644 --- a/examples/rnnlm/rnnlm.proto +++ b/examples/rnnlm/rnnlm.proto @@ -39,9 +39,9 @@ message DataProto { } extend singa.LayerProto { - optional EmbeddingProto embedding_conf = 101; - optional LossProto loss_conf = 102; - optional DataProto data_conf = 103; + optional EmbeddingProto embedding_conf = 1001; + optional LossProto loss_conf = 1002; + optional DataProto data_conf = 1003; } message WordRecord { diff --git a/include/singa/driver.h b/include/singa/driver.h index fb5a33a8c7..f0934e403d 100644 --- a/include/singa/driver.h +++ b/include/singa/driver.h @@ -18,8 +18,8 @@ * under the License. * *************************************************************/ -#ifndef SINGA_SINGA_DRIVER_H_ -#define SINGA_SINGA_DRIVER_H_ +#ifndef SINGA_DRIVER_H_ +#define SINGA_DRIVER_H_ #include #include "singa/proto/job.pb.h" @@ -74,7 +74,7 @@ class Driver { * files. * @param[in] str serialized string recorded job configuration. */ - void Train(bool resume, const std::string str); + void Train(bool resume, const std::string str); /** * Create workers and servers to conduct the training. * @@ -125,7 +125,8 @@ class Driver { * @param[in] net training neural network. * @return worker instances */ - const vector CreateWorkers(const JobProto& job_conf, NeuralNet* net); +// const vector CreateWorkers(const JobProto& job_conf, NeuralNet* net); + const vector CreateWorkers(const JobProto& job_conf); /*********** Subclasses registers *************************/ @@ -259,4 +260,4 @@ int Driver::RegisterWorker(const Type& type) { } // namespace singa -#endif // SINGA_SINGA_DRIVER_H_ +#endif // SINGA_DRIVER_H_ diff --git a/include/singa/utils/cluster.h b/include/singa/utils/cluster.h index c1dc93bdc8..2114e9293c 100644 --- a/include/singa/utils/cluster.h +++ b/include/singa/utils/cluster.h @@ -53,6 +53,7 @@ class Cluster { inline int nservers_per_group() const { return cluster_.nservers_per_group();} inline int nworkers_per_procs() const { return cluster_.nworkers_per_procs();} inline int nservers_per_procs() const { return cluster_.nservers_per_procs();} + inline int ngpu_per_group() const { return cluster_.ngpu_per_group();} inline int nworker_groups_per_server_group() const { if (nserver_groups() == 0 || nservers_per_group() == 0) return 1; diff --git a/include/singa/utils/context.h b/include/singa/utils/context.h index b1128c1fb1..2afedb3a60 100644 --- a/include/singa/utils/context.h +++ b/include/singa/utils/context.h @@ -23,6 +23,7 @@ #define SINGA_UTILS_CONTEXT_H_ #include +#include #include #include #include @@ -145,7 +146,7 @@ class Context { } /** - * \copybreif rand_generator(const std::thread::id&); + * \copybrief rand_generator(const std::thread::id&); * @return the CPU random generator for the calling thread. */ std::mt19937* rand_generator() { @@ -170,7 +171,7 @@ class Context { } #ifdef USE_GPU /** - * \copybreif cublas_handle_(const std::thread::id&); + * \copybrief cublas_handle_(const std::thread::id&); * @return cublas handle for the calling thread. */ cublasHandle_t cublas_handle() { @@ -254,7 +255,7 @@ class Context { //!< max num of GPUs per process const int kMaxNumGPU = 64; //!< map from thread id to device id - std::unordered_map device_id_; + std::unordered_map device_id_; //!< map from thread id to cpu rand generator std::unordered_map rand_generator_; //!< map from thread id to cpu rand generator seed diff --git a/include/singa/utils/threadpool.h b/include/singa/utils/threadpool.h new file mode 100644 index 0000000000..362e303fbe --- /dev/null +++ b/include/singa/utils/threadpool.h @@ -0,0 +1,203 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#ifndef SINGA_THREADPOOL_H_ +#define SINGA_THREADPOOL_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace singa { + +/** + * Stores an arbitrary number of threads and facilitates the + * queueing and distribution of arbitrary tasks to various + * threads pooled in the thread pool. In the event that a task + * throws an exception, if the exception type is derived from + * std::exception then it will be logged, otherwise ignored, and + * the worker thread will return to the thread pool. Undefined + * behaviour occurs whenever an operation on the mutex object + * throws an exception. + */ +class Threadpool final { +public: + /** + * Create a thread pool with the specified size and initialize + * each worker thread. Provides basic exception safety. + */ + explicit Threadpool( const std::size_t size ); + + /** + * Destroy a thread pool. Must be called from a thread not managed + * by this thread pool. + */ + ~Threadpool(); + + // Enqueue the specified task. Provides basic exception safety. + template + auto enqueue( Fn &&fn, Args && ...args ) + ->std::future::type>; + + /** + * Retrieve the number of threads currently managed by this + * thread pool. + */ + int size() { return this->threads_.size(); } + +private: + // Type of a task. + using task_t = std::function; + // A list of worker threads. + std::vector threads_; + // A queue of tasks to be distributed and invoked. + std::queue task_queue_; + // A mutex for atomically getting and setting this threadpool's fields. + std::mutex queue_mutex_; + // Condition used to lock each worker thread until its been notified + // that a potential task is ready to be assigned to it and invoked. + std::condition_variable cond_handle_task_; + // Determine if this thread pool is being destroyed or there is at least + // one queued tasks to be distributed. + std::function ready_task_; + + // State of the thread pool. + bool stop; + + Threadpool( const Threadpool & ) = delete; // Disabled copy constructor. + Threadpool( Threadpool && ) = delete; // Disabled move constructor. + Threadpool &operator=( const Threadpool & ) = delete; // Disabled copy assignment. + Threadpool &operator=( Threadpool && ) = delete; // Disabled move assignment. +}; + +// Constructor +inline Threadpool::Threadpool( const size_t size ) : stop( false ) { + // Set the target function for the ready task predicate. + this->ready_task_ = [this] { + return this->stop || !this->task_queue_.empty(); + }; + + // Create the specified number of new worker threads. + for ( size_t k = 0; k < size; ++k ) { + this->threads_.emplace_back( [this] { + // This worker thread's assigned task. + task_t task; + // Synchronization state of this worker thread. + bool synchronize = false; + + // Iterate until this worker thread is to be synchronized. + do { + { + // Block this worker thread until it can accept a task from the + // thread pool. + std::unique_lock lock( this->queue_mutex_ ); + + this->cond_handle_task_.wait( lock, this->ready_task_ ); + + // Determine if we are destroying this thread pool and the tasks + // queue is empty. If so, synchronize this worker thread, otherwise + // dequeue a task and assign it to this worker thread. + if ( this->stop && this->task_queue_.empty() ) { + synchronize = true; + } else { + task = std::move( this->task_queue_.front() ); + this->task_queue_.pop(); + } + } + + // Determine if we are not synchronizing this worker thread. If so, + // invoke this worker thread's assigned task. + if ( !synchronize ) { + try { + task(); + } catch ( std::exception & ) { + // Log the exception caught by the worker thread's assigned task. + } catch ( ... ) {} + } + } while ( !synchronize ); + }); + } +} + +// Destructor +inline Threadpool::~Threadpool() { + // Change the state of the thread pool to "being destroyed". + { + std::lock_guard lock( this->queue_mutex_ ); + this->stop = true; + } + + // Notify all of the blocked worker threads of this state change. + this->cond_handle_task_.notify_all(); + + // Block the current thread until all of the worker threads from this thread + // pool are synchronized. + for ( auto &thread : this->threads_ ) { + thread.join(); + } +} + +template +inline auto Threadpool::enqueue( Fn &&fn, Args && ...args ) + -> std::future::type> { + + using return_type = typename std::result_of::type; + + // Package the specified task in to a new copyable managed object and + // preserve each argument's value category. + auto task = std::make_shared>( + std::bind( std::forward( fn ), std::forward( args )... ) + ); + + // Get the promised future return value of the specified task. + std::future retval = task->get_future(); + + // Enqueue the specified task. + { + std::unique_lock lock( this->queue_mutex_ ); + + // Determine if we are preparing to destroy this thread pool. If so, throw + // a runtime error exception. + if ( this->stop ) { + throw std::runtime_error( "Cannot enqueue a task while this thread pool " + "is preparing to be destroyed." ); + } + + //this->task_queue_.emplace( [task = std::move( task )]{ (*task)(); } ); + this->task_queue_.emplace( [task](){ (*task)(); } ); + } + + // Notify a worker thread that there is a new task to be invoked. + this->cond_handle_task_.notify_one(); + + return retval; +} + +} // namespace singa + +#endif // SINGA_THREADPOOL_H_ diff --git a/include/singa/worker.h b/include/singa/worker.h index 34c8000371..6f75a10dfe 100644 --- a/include/singa/worker.h +++ b/include/singa/worker.h @@ -58,14 +58,13 @@ class Worker { * -# Back-propagation for the feed-forward models, e.g., CNN and MLP, and the * recurrent neural networks. * -# Contrastive divergence for the energy models, e.g., RBM. - * + * TODO: Update this when it works. * @return a pointer to the instance of the Worker subclass. */ - static Worker* Create(const AlgProto& conf); + static Worker* Create(const AlgProto& conf, int grp_id, int dev_id, int dev_type); + virtual ~Worker(); /** - * @param[in] grp_id global worker group ID - * @param[in] id worker ID within the group * @param[in] conf job configuration * @param[in] train_net pointer to the training neural net, which could be * shared with other workers from the same group. Different workers run over @@ -77,8 +76,7 @@ class Worker { * first worker from the first group would have test neural net. All other * workers receive nullptr for this argument. */ - virtual void Setup(int grp_id, int id, const JobProto& conf, - NeuralNet* train_net, NeuralNet* val_net, NeuralNet* test_net); + virtual void Setup(const JobProto& conf); /** * Main function of Worker. * @@ -261,16 +259,30 @@ class Worker { /** * @return group ID */ - inline int grp_id() const { return grp_id_; } + inline int grp_id() const { return this->grp_id_; } + inline void grp_id(int grp_id) { this->grp_id_ = grp_id; } + /** + * @return worker ID within the worker group. + */ + inline int dev_id() const { return this->id_; } + inline void dev_id(int id) { this->id_ = id; } + + /** + * @return The device type of this worker. It is -1 for CPU and >-1 for GPU. + */ + inline int dev_type() const { return this->device_type_; } + /** - * @reutrn worker ID within the worker group. + * @param devid The type value to assign to this worker. */ - inline int id() const { return id_; } + inline void dev_type(int dev_type) { this->device_type_ = dev_type; } protected: int grp_id_ = -1, id_ = -1; int step_ = 0; + bool execute_; JobProto job_conf_; + int device_type_; NeuralNet* train_net_ = nullptr; NeuralNet* test_net_ = nullptr; NeuralNet* val_net_ = nullptr; diff --git a/src/driver.cc b/src/driver.cc index 6163865620..f30b65800e 100644 --- a/src/driver.cc +++ b/src/driver.cc @@ -30,6 +30,7 @@ #include "singa/utils/tinydir.h" #include "singa/utils/cluster.h" #include "singa/utils/context.h" +#include "singa/utils/threadpool.h" #include "singa/proto/job.pb.h" #include "singa/server.h" #include "singa/stub.h" @@ -164,6 +165,9 @@ void Driver::InitLog(char* arg) { google::InitGoogleLogging(arg); } +////////// +// TRAIN +////////// void Driver::Train(bool resume, const std::string str) { JobProto job_conf; job_conf.ParseFromString(str); @@ -177,9 +181,9 @@ void Driver::Train(bool resume, const JobProto& job_conf) { Cluster::Setup(job_id_, singa_conf_, job_conf.cluster()); tinydir_dir workspace; if (tinydir_open(&workspace, job_conf.cluster().workspace().c_str()) == -1) - LOG(FATAL) << "workspace not exist: " << job_conf.cluster().workspace(); + LOG(FATAL) << "Workspace does not exist: " << job_conf.cluster().workspace(); if (job_conf.num_openblas_threads() != 1) - LOG(WARNING) << "openblas luanches " + LOG(WARNING) << "Openblas launched " << job_conf.num_openblas_threads() << " threads"; openblas_set_num_threads(job_conf.num_openblas_threads()); @@ -191,30 +195,12 @@ void Driver::Train(bool resume, const JobProto& job_conf) { Train(job); } -void Driver::Test(const std::string str) { - JobProto job_conf; - job_conf.ParseFromString(str); - Test(job_conf); -} - -void Driver::Test(const JobProto& job_conf) { - Cluster::Setup(job_id_, singa_conf_, job_conf.cluster()); - Cluster::Get()->Register(getpid(), "localhost"); - // TODO(wangwei) extend to a group with multiple workers - auto worker = Worker::Create(job_conf.train_one_batch()); - worker->Setup(0, 0, job_conf, nullptr, nullptr, nullptr); - auto net = NeuralNet::Create(job_conf.neuralnet(), kTest, 1); - WriteStringToTextFile(Cluster::Get()->vis_folder() + "/test_net.json", - net->ToGraph(true).ToJson()); - vector paths; - for (const auto& p : job_conf.checkpoint_path()) - paths.push_back(p); - net->Load(paths); - worker->Test(job_conf.test_steps(), kTest, net); -} - void Driver::Train(const JobProto& job_conf) { auto cluster = Cluster::Get(); + + Threadpool threadpool(cluster->nserver_groups() * cluster->nservers_per_group() + + cluster->nworker_groups() * cluster->nworkers_per_group()); + int nserver_grps = cluster->nserver_groups(); int grp_size = cluster->nworkers_per_group(); Stub stub; @@ -229,7 +215,8 @@ void Driver::Train(const JobProto& job_conf) { NeuralNet* net = NeuralNet::Create(job_conf.neuralnet(), kTrain, grp_size); WriteStringToTextFile(cluster->vis_folder() + "/train_net.json", net->ToGraph(true).ToJson()); - const vector workers = CreateWorkers(job_conf, net); +// const vector workers = CreateWorkers(job_conf, net); + const vector workers = CreateWorkers(job_conf); const vector servers = CreateServers(job_conf, net); #ifdef USE_MPI @@ -238,30 +225,25 @@ void Driver::Train(const JobProto& job_conf) { MPIQueues.push_back(make_shared()); #endif - vector threads; + // Setup the workers + for (auto worker: workers) + worker->Setup(job_conf); + + // Actual running for (auto server : servers) - threads.push_back(std::thread(&Server::Run, server)); - int gpu = 0; - auto context = Singleton::Instance(); - // CHECK_LE(workers.size(), job_conf.gpu_size()); - for (auto worker : workers) { - threads.push_back(std::thread(&Worker::Run, worker)); - int device_id = -1; - if (gpu < job_conf.gpu_size()) { - device_id = job_conf.gpu(gpu++); - } - context->SetupDevice(threads.back().get_id(), device_id); - } + threadpool.enqueue(&Server::Run, server); + for (auto worker : workers) + threadpool.enqueue(&Worker::Run, worker); + if (grp_size > 1 || nserver_grps > 0) { int nservers_per_grp = cluster->nservers_per_group(); int lcm = LeastCommonMultiple(nservers_per_grp, nserver_grps); auto slices = Param::ComputeSlices(lcm, net->params()); auto slice2server = PartitionSlices(nservers_per_grp, slices); + stub.Run(slice2server, workers, servers); } - for (auto& thread : threads) - thread.join(); for (auto server : servers) delete server; delete net; @@ -276,6 +258,34 @@ void Driver::Train(const JobProto& job_conf) { } } +////////// +// TEST +////////// +void Driver::Test(const std::string str) { + JobProto job_conf; + job_conf.ParseFromString(str); + Test(job_conf); +} + +void Driver::Test(const JobProto& job_conf) { + Cluster::Setup(job_id_, singa_conf_, job_conf.cluster()); + Cluster::Get()->Register(getpid(), "localhost"); + + // TODO(wangwei) extend to a group with multiple workers + auto worker = Worker::Create(job_conf.train_one_batch(), 0, 0, -1); + worker->Setup(job_conf); + + auto net = NeuralNet::Create(job_conf.neuralnet(), kTest, 1); + WriteStringToTextFile(Cluster::Get()->vis_folder() + "/test_net.json", + net->ToGraph(true).ToJson()); + + vector paths; + for (const auto& p : job_conf.checkpoint_path()) + paths.push_back(p); + net->Load(paths); + worker->Test(job_conf.test_steps(), kTest, net); +} + void Driver::SetupForResume(JobProto* job_conf) { tinydir_dir dir; std::string folder = Cluster::Get()->checkpoint_folder(); @@ -315,23 +325,31 @@ void Driver::SetupForResume(JobProto* job_conf) { tinydir_close(&dir); } +/* const vector Driver::CreateWorkers(const JobProto& job_conf, NeuralNet* net) { auto cluster = Cluster::Get(); vector workers; - if (!cluster->has_worker()) return workers; - int wgrp_size = cluster->nworkers_per_group(); - int nservers_per_grp = cluster->nservers_per_group(); - int nserver_grps = cluster->nserver_groups(); + + if (!cluster->has_worker()) + return workers; + + int nservers_per_grp = cluster->nservers_per_group(), + nserver_grps = cluster->nserver_groups(), + ngpu_per_group = cluster->ngpu_per_group(); int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp); + const vector rng = cluster->ExecutorRng(cluster->procs_id(), cluster->nworkers_per_group(), cluster->nworkers_per_procs()); int gstart = rng[0], gend = rng[1], wstart = rng[2], wend = rng[3]; + LOG(ERROR) << "gstart: " << gstart << " gend: " << gend + << " wstart: " << wstart << " wend: " << wend; + for (int gid = gstart; gid < gend; gid++) { - NeuralNet* train_net = nullptr, *test_net = nullptr, *val_net = nullptr; + NeuralNet* train_net = net, *test_net = nullptr, *val_net = nullptr; if (gid == gstart) { - train_net = net; Param::SliceParams(lcm, train_net->params()); + // test and validation are performed by the 1st group. if (gid == 0 && job_conf.test_steps() > 0) { test_net = NeuralNet::Create(job_conf.neuralnet(), kTest, 1); @@ -341,21 +359,46 @@ const vector Driver::CreateWorkers(const JobProto& job_conf, val_net = NeuralNet::Create(job_conf.neuralnet(), kVal, 1); val_net->ShareParamsFrom(train_net, false); } + } else { - train_net = NeuralNet::Create(job_conf.neuralnet(), kTrain, wgrp_size); if (cluster->share_memory()) { train_net->ShareParamsFrom(net, true); } else { Param::SliceParams(lcm, train_net->params()); } } + for (int wid = wstart; wid < wend; wid++) { - auto *worker = Worker::Create(job_conf.train_one_batch()); + // Set DeviceType to CPU or GPU here. + auto *worker = Worker::Create(job_conf.train_one_batch(), gid, wid, + (ngpu_per_group > 0) ? job_conf.gpu(ngpu_per_group-1) : -1); + ngpu_per_group--; // TODO(wangwei) extend to test among workers in a grp if (wid == 0) - worker->Setup(gid, wid, job_conf, train_net, val_net, test_net); + worker->Setup(job_conf, train_net, val_net, test_net); else - worker->Setup(gid, wid, job_conf, train_net, nullptr, nullptr); + worker->Setup(job_conf, train_net, nullptr, nullptr); + workers.push_back(worker); + } + } + return workers; +}*/ + +const vector Driver::CreateWorkers(const JobProto& job_conf) { + auto cluster = Cluster::Get(); + vector workers; + int ngpu_per_group = cluster->ngpu_per_group(); + + const vector rng = cluster->ExecutorRng(cluster->procs_id(), + cluster->nworkers_per_group(), cluster->nworkers_per_procs()); + int gstart = rng[0], gend = rng[1], wstart = rng[2], wend = rng[3]; + + for (int gid = gstart; gid < gend; gid++) { + for (int wid = wstart; wid < wend; wid++) { + auto *worker = Worker::Create(job_conf.train_one_batch(), gid, wid, + (ngpu_per_group > 0) ? job_conf.gpu(ngpu_per_group-1) : -1); + + ngpu_per_group--; workers.push_back(worker); } } @@ -383,6 +426,7 @@ const vector Driver::CreateServers(const JobProto& job_conf, const vector rng = cluster->ExecutorRng(server_procs, cluster->nservers_per_group(), cluster->nservers_per_procs()); int gstart = rng[0], gend = rng[1], start = rng[2], end = rng[3]; + for (int gid = gstart; gid < gend; gid++) { for (int sid = start; sid < end; sid++) { auto server = new Server(gid, sid, job_conf, slice2group, slice2server); diff --git a/src/neuralnet/input_layer/store.cc b/src/neuralnet/input_layer/store.cc index 283b0c73a7..dc3d379d8e 100644 --- a/src/neuralnet/input_layer/store.cc +++ b/src/neuralnet/input_layer/store.cc @@ -38,6 +38,7 @@ void StoreInputLayer::Setup(const LayerProto& conf, if (conf.partition_dim() == 0) { batchsize_ /= conf.num_partitions(); } + //LOG(ERROR) << "Ratio: " << layer_conf_.store_conf().ratio(); } void StoreInputLayer::ComputeFeature(int flag, diff --git a/src/proto/common.proto b/src/proto/common.proto index b1ba1b686f..514ed1630e 100644 --- a/src/proto/common.proto +++ b/src/proto/common.proto @@ -50,6 +50,12 @@ enum ConnectionType { kOneToMany = 2; } +enum DeviceType { + kCPU = 0; + kGPU = 1; + kOther = 2; +} + // to import caffe's lmdb dataset message CaffeDatum { optional int32 channels = 1; diff --git a/src/proto/job.proto b/src/proto/job.proto index 7bc0ea3b9b..b40f793ba1 100644 --- a/src/proto/job.proto +++ b/src/proto/job.proto @@ -166,6 +166,8 @@ message ClusterProto { optional int32 nservers_per_group = 4 [default = 1]; optional int32 nworkers_per_procs = 5 [default = 1]; optional int32 nservers_per_procs = 6 [default = 1]; + optional int32 ngpu_per_group = 7 [default = 0]; + // local workspace for checkpoint files and vis files //required string workspace = 10; optional string workspace = 10; @@ -379,6 +381,7 @@ message StoreProto { optional bool encoded = 10 [default = false]; optional int32 random_skip = 11 [default = 0]; optional bool has_label = 12 [default = true]; + optional int32 ratio = 13 [default = 1]; } message CharRNNProto { optional string path = 1; diff --git a/src/stub.cc b/src/stub.cc index c06128c2a0..9a138c3b2e 100644 --- a/src/stub.cc +++ b/src/stub.cc @@ -66,7 +66,7 @@ const std::unordered_map CreateParamShard( // grp id -> worker id range std::unordered_map> grp2workers; for (auto worker : workers) { - int grp = worker->grp_id(), id = worker->id(); + int grp = worker->grp_id(), id = worker->dev_id(); if (grp2net.find(grp) == grp2net.end()) { grp2net[grp] = worker->train_net(); grp2workers[grp] = std::make_pair(id, id + 1); diff --git a/src/worker.cc b/src/worker.cc index 2afa8b06fe..b5fe8cb61f 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -22,8 +22,6 @@ #include "singa/worker.h" #include -#include -#include #include #include "singa/utils/cluster.h" #include "singa/utils/factory.h" @@ -35,25 +33,60 @@ namespace singa { using std::string; -Worker* Worker::Create(const AlgProto& conf) { +Worker* Worker::Create(const AlgProto& conf, int grp_id, int dev_id, int dev_type) { auto factory = Singleton>::Instance(); Worker* worker = nullptr; if (conf.has_user_alg()) worker = factory->Create(conf.user_alg()); else worker = factory->Create(conf.alg()); + worker->grp_id(grp_id); + worker->dev_id(dev_id); + worker->dev_type(dev_type); return worker; } -void Worker::Setup(int grp_id, int id, const JobProto& conf, - NeuralNet* train_net, NeuralNet* val_net, NeuralNet* test_net) { - grp_id_ = grp_id; - id_ = id; - job_conf_ = conf; - train_net_ = train_net; - val_net_ = val_net; - test_net_ = test_net; - bridge_dealer_ = dealer_ = nullptr; +void Worker::Setup(const JobProto& job_conf) { + + auto cluster = Cluster::Get(); + + int grp_size = cluster->nworkers_per_group(); + int nservers_per_grp = cluster->nservers_per_group(); + int nserver_grps = cluster->nserver_groups(); + + this->train_net_ = NeuralNet::Create(job_conf.neuralnet(), kTrain, grp_size); + this->test_net_ = nullptr; + this->val_net_ = nullptr; + + int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp); + + const vector rng = cluster->ExecutorRng(cluster->procs_id(), + cluster->nworkers_per_group(), cluster->nworkers_per_procs()); + + if (this->grp_id_ == rng[0]) { + Param::SliceParams(lcm, this->train_net_->params()); + + if (this->grp_id_ == 0 && this->id_ == 0) { + if (job_conf.test_steps() > 0) { + this->test_net_ = NeuralNet::Create(job_conf.neuralnet(), kTest, 1); + this->test_net_->ShareParamsFrom(this->train_net_, false); + } + + if (job_conf.validate_steps() > 0) { + this->val_net_ = NeuralNet::Create(job_conf.neuralnet(), kVal, 1); + this->val_net_->ShareParamsFrom(this->train_net_, false); + } + } + } else { + if (cluster->share_memory()) { + this->train_net_->ShareParamsFrom(this->train_net_, true); + } else { + Param::SliceParams(lcm, this->train_net_->params()); + } + } + + this->job_conf_ = job_conf; + this->bridge_dealer_ = dealer_ = nullptr; } Worker::~Worker() { @@ -62,13 +95,16 @@ Worker::~Worker() { } void Worker::Run() { - // setup gpu device auto context = Singleton::Instance(); - int device = context->device_id(std::this_thread::get_id()); + LOG(ERROR) << "Thread ID: " << std::this_thread::get_id() << " Type: " << this->dev_type() + << " Map: " << context->device_id(std::this_thread::get_id()); + if (this->dev_type() > -1) { + LOG(ERROR) << "Activating device."; + context->ActivateDevice(this->dev_type()); + } + LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") " - << " start on " << (device >= 0 ? "GPU " + std::to_string(device) : "CPU"); - if (device >= 0) - context->ActivateDevice(device); + << " start on " << (device_type_ >= 0 ? "GPU " + std::to_string(device_type_) : "CPU"); auto cluster = Cluster::Get(); int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group();