// // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE.md file in the project root for full license information. // #include "stdafx.h" #include #include "Basics.h" #include "Constants.h" #include "CNTKLibrary.h" #include "DistributedCommunicator.h" #include "CUDAPageLockedMemAllocator.h" #include "MatrixQuantizerImpl.h" #include "GPUDataTransferer.h" #include #include "Utils.h" using namespace Microsoft::MSR::CNTK; namespace CNTK { void Recreate(const std::vector& values, std::vector& output) { output.resize(values.size()); for (size_t i = 0; i < values.size(); ++i) { const auto inputView = values[i]; output[i] = MakeSharedObject(inputView->GetDataType(), inputView->Shape(), inputView->Device()); } } DistributedCommunicatorPtr MPICommunicator(size_t packThresholdSizeInBytes) { return std::make_shared(packThresholdSizeInBytes); } void DistributedCommunicator::Finalize() { auto mpi = MPIWrapper::GetInstance(false); if (mpi) mpi->Finalize(); MPIWrapper::DeleteInstance(); } MPICommunicatorImpl::Buffer MPICommunicatorImpl::AllocateIntermediateBuffer(int deviceID, size_t totalSize) { assert(deviceID >= 0); Buffer buffer; buffer.totalSize = totalSize; buffer.data = std::shared_ptr( CUDAPageLockedMemAllocator::Malloc(totalSize, deviceID), [deviceID](void* p) { CUDAPageLockedMemAllocator::Free(p, deviceID); }); return buffer; } inline size_t GetBufferSize(const NDArrayViewPtr& viewPtr) { return viewPtr->Shape().TotalSize() * DataTypeSize(viewPtr->GetDataType()); } inline void* GetDataBuffer(const NDArrayViewPtr& viewPtr) { if (viewPtr->GetDataType() == DataType::Float) return viewPtr->WritableDataBuffer(); if (viewPtr->GetDataType() == DataType::Double) return viewPtr->WritableDataBuffer(); LogicError("Unknown DataType"); return nullptr; // Make compiler happy. } MPICommunicatorImpl::MPICommunicatorImpl(size_t packThresholdSizeInBytes) { m_mpi = MPIWrapper::GetInstance(); if (m_mpi == nullptr) { m_mpi = MPIWrapper::GetInstance(true /*create*/); } m_currentWorker.m_globalRank = m_mpi->CurrentNodeRank(); m_currentWorker.m_hostId = std::wstring(m_mpi->CurrentNodeName()); for (size_t i = 0; i < m_mpi->NumNodesInUse(); ++i) { if (i == m_currentWorker.m_globalRank) m_workers.insert(m_currentWorker); else // TOOD: Nodes have to exchange their names. m_workers.insert({ i, L"" }); } m_packThresholdSizeInBytes = packThresholdSizeInBytes; } void MPICommunicatorImpl::Initialize(const std::vector& values) { assert(CPUDEVICE < 0); // just in case somebody decides to change CPUDEVICE macro. DeviceDescriptor lastGpuDevice = DeviceDescriptor::CPUDevice(); m_gpuDataTransferers.resize(values.size()); m_intermediateCPUBuffers.resize(values.size()); for (auto i = 0; i < values.size(); ++i) { auto view = values[i]; auto device = view->Device(); // Make sure none of the values are sparse - we currently do not support aggregation of sparse matrices if (view->GetStorageFormat() != StorageFormat::Dense) RuntimeError("MPICommunicator: Aggregation for sparse matrices is currently not supported."); // TODO: device.Type should be called Kind. if (device.Type() == DeviceKind::CPU) { m_intermediateCPUBuffers[i] = Buffer(); m_gpuDataTransferers[i] = nullptr; } else if (device.Type() == DeviceKind::GPU) { if (lastGpuDevice.Type() == DeviceKind::CPU) lastGpuDevice = device; else if (device.Id() != lastGpuDevice.Id()) // For the time being, assume all devices have the same id. LogicError("MPICommunicator: Not all values are on the same GPU device id"); auto requiredSize = GetBufferSize(view); m_gpuDataTransferers[i] = std::make_shared(device.Id(), true); if (m_intermediateCPUBuffers[i].totalSize < requiredSize) m_intermediateCPUBuffers[i] = AllocateIntermediateBuffer(device.Id(), requiredSize); } else { LogicError("Invalid device type (%u).", (unsigned int)device.Type()); } } } const std::unordered_set& MPICommunicatorImpl::Workers() const { return m_workers; } const DistributedWorkerDescriptor& MPICommunicatorImpl::CurrentWorker() const { return m_currentWorker; } void MPICommunicatorImpl::CheckWorkers(const std::unordered_set& sendToWorkers) { // Currently all operations should be executed on all workers, we do not support subgroups. if (sendToWorkers != m_workers) NOT_IMPLEMENTED; } void MPICommunicatorImpl::Aggregate(const std::vector& values, std::vector& outputValues, const std::unordered_set& sendToWorkers) { if (outputValues.empty()) { Recreate(values, outputValues); } else if (outputValues.size() != values.size()) { NOT_IMPLEMENTED; } auto device = GetNonCPUDevice(values); if (device.Type() == DeviceKind::GPU) { // Since we will be copying the gradients asynchronously, let us // ensure that the gradient matrices have been computed before starting to aggregate // them asynchronously on another thread. This essentially means that when we are using // a GPU device, we will synchronize on the main GPU compute stream before starting // the gradient aggregation asynchronously on a separate stream std::unique_ptr mainStreamSyncEvent(MatrixComputeStreamEvent::Create(device.Id())); mainStreamSyncEvent->SynchronizeDataTransferFetchStreamWithEvent(); } else { LogicError("Invalid device type (%u).", (unsigned int)device.Type()); } AggregateImpl(values, outputValues, sendToWorkers); } DistributedCommunicatorPtr MPICommunicatorImpl::SubGroup(const std::unordered_set&) const { NOT_IMPLEMENTED; } void MPICommunicatorImpl::Concatenate(const std::vector&, std::vector&, const std::unordered_set&) { NOT_IMPLEMENTED; } void MPICommunicatorImpl::Gather( const Dictionary& input, std::vector>& output, const std::unordered_set& sendToWorkers) { CheckWorkers(sendToWorkers); std::stringstream dict; dict << input; std::string encoded = dict.str(); // Exchange data sizes. int encodedSizeInBytes = (int)encoded.size(); std::vector othersSize; othersSize.resize(m_mpi->NumNodesInUse()); m_mpi->Gather(&encodedSizeInBytes, 1, &othersSize[0], 1, 0); output.resize(m_mpi->NumNodesInUse(), std::make_shared()); int totalSizeInBytes = std::accumulate(othersSize.begin(), othersSize.end(), 0); // Exchange actual data std::vector gathered; gathered.resize(std::max(totalSizeInBytes, 1)); // buffer should be at least of size 1. std::vector offsets; offsets.resize(m_mpi->NumNodesInUse()); int currentOffset = 0; for (size_t i = 0; i < offsets.size(); ++i) { offsets[i] = currentOffset; currentOffset += othersSize[i]; } m_mpi->Gatherv(&encoded[0], encoded.size(), &gathered[0], &othersSize[0], &offsets[0], 0); if (CurrentWorker().m_globalRank != 0) return; offsets.push_back(totalSizeInBytes); output.resize(m_workers.size()); for (size_t i = 0; i < offsets.size() - 1; ++i) { size_t startOffset = offsets[i]; size_t size = offsets[i + 1] - startOffset; std::stringstream ss; ss.write(&gathered[startOffset], size); output[i] = std::make_shared(); ss >> *output[i]; } } void MPICommunicatorImpl::Concatenate(const std::vector& input, std::vector& output, const std::unordered_set& workers) { // TODO: Currently we only support concatenation of inputs of the same size. CheckWorkers(workers); // Check inputs, currently we support only CPU auto nonCpu = std::find_if(input.begin(), input.end(), [](const NDArrayViewPtr& v) { return v->Device() != DeviceDescriptor::CPUDevice(); }); if (nonCpu != input.end()) LogicError("MPICommunicator: Currently only NDArrayViews located on CPU are supported for concatenation."); output.resize(input.size()); // Currently we only support concatenation of input of the same size. // Gathering blocks sequentially. for (size_t i = 0; i < input.size(); ++i) { if (output[i] == nullptr || output[i]->Shape().TotalSize() != m_mpi->NumNodesInUse() * input[i]->Shape().TotalSize() || output[i]->GetDataType() != input[i]->GetDataType()) { // Allocating flat array for all ranks. output[i] = std::make_shared(input[i]->GetDataType(), NDShape{ input[i]->Shape().TotalSize() * m_mpi->NumNodesInUse() }, DeviceDescriptor::CPUDevice()); } } // Initiate concatenation. std::vector allReduceRequests(input.size()); for (size_t i = 0; i < input.size(); ++i) { auto& in = input[i]; auto& out = output[i]; if (input[i]->GetDataType() == DataType::Float) m_mpi->AllGatherAsync(in->DataBuffer(), in->Shape().TotalSize(), out->WritableDataBuffer(), in->Shape().TotalSize(), &allReduceRequests[i]); else if (input[i]->GetDataType() == DataType::Double) m_mpi->AllGatherAsync(in->DataBuffer(), in->Shape().TotalSize(), out->WritableDataBuffer(), in->Shape().TotalSize(), &allReduceRequests[i]); else LogicError("MPICommunicator: input DataType is not supported."); } // Wait till all requests are finished. m_mpi->WaitAll(allReduceRequests); } void MPICommunicatorImpl::AggregateInPlace( const std::vector& values, const std::unordered_set& sendToWorkers) { AggregateImpl(values, values, sendToWorkers); } void MPICommunicatorImpl::AggregateImpl( const std::vector& inputValues, const std::vector& outputValues, const std::unordered_set& sendToWorkers) { CheckWorkers(sendToWorkers); if (m_mpi->NumNodesInUse() == 1) // No need to aggregate anything. return; assert(inputValues.size() == outputValues.size()); auto numValues = inputValues.size(); if (numValues == 0) return; std::vector valuesToAggregate; // Corresponding to inputValues std::vector valuesAfterAggregate; // Corresponding to outputValues size_t packedFloatGradientsSizeInBytes = 0; size_t packedDoubleGradientsSizeInBytes = 0; std::vector packedFloatGradientsIndex; std::vector packedDoubleGradientsIndex; for (auto i = 0; i < numValues; i++) { // Push index to packing queue if the gradient's size is less than threshold size if (GetBufferSize(inputValues[i]) < m_packThresholdSizeInBytes && (inputValues[i]->GetDataType() == DataType::Float)) { packedFloatGradientsSizeInBytes += GetBufferSize(inputValues[i]); packedFloatGradientsIndex.push_back(i); } else if (GetBufferSize(inputValues[i]) < m_packThresholdSizeInBytes && (inputValues[i]->GetDataType() == DataType::Double)) { packedDoubleGradientsSizeInBytes += GetBufferSize(inputValues[i]); packedDoubleGradientsIndex.push_back(i); } else { valuesToAggregate.push_back(inputValues[i]); valuesAfterAggregate.push_back(outputValues[i]); } } // Do the packing to reduce the number of MPI requests. // Do not re-allocating the continous buffer if existing buffer size equals to required one. m_aggregationBufferFloat = SetContinuousBuffer(packedFloatGradientsIndex, packedFloatGradientsSizeInBytes, inputValues, outputValues, valuesToAggregate, valuesAfterAggregate); m_aggregationBufferDouble = SetContinuousBuffer(packedDoubleGradientsIndex, packedDoubleGradientsSizeInBytes, inputValues, outputValues, valuesToAggregate, valuesAfterAggregate); PackToContinuousBuffer(m_aggregationBufferFloat.get(), packedFloatGradientsIndex, inputValues, outputValues, valuesToAggregate, valuesAfterAggregate); PackToContinuousBuffer(m_aggregationBufferDouble.get(), packedDoubleGradientsIndex, inputValues, outputValues, valuesToAggregate, valuesAfterAggregate); numValues = valuesToAggregate.size(); Initialize(valuesToAggregate); // We need to make sure no compuatation happens on the main CUDA stream. auto device = GetNonCPUDevice(valuesToAggregate); if (device.Type() != DeviceKind::CPU) { // Since we will be copying the gradients asynchronously, let us // ensure that the gradient matrices have been computed before starting to aggregate // them asynchronously on another thread. This essentially means that when we are using // a GPU device, we will synchronize on the main GPU compute stream before starting // the gradient aggregation asynchronously on a separate stream std::unique_ptr mainStreamSyncEvent(MatrixComputeStreamEvent::Create(device.Id())); mainStreamSyncEvent->SynchronizeDataTransferFetchStreamWithEvent(); } // BUGBUG: assuming the all values on the same device if (m_nccl == nullptr) { m_nccl.reset(new NcclComm(AsCNTKImplDeviceId(inputValues[0]->Device()), m_mpi)); } // For all values residing on GPU initiate async transfer to CPU buffers if needed CopyDataFromGPUToCPU(valuesToAggregate); std::vector allReduceRequests; for (auto i = 0; i < numValues; ++i) { auto inputValue = valuesToAggregate[i]; if (ShouldCopyDataToCPU(inputValue)) { // TODO: actually, we can start reducing all cpu values first, and then wait for the gpu->cpu transfer to finish. m_gpuDataTransferers[i]->WaitForCopyGPUToCPUAsync(); } auto numElements = inputValue->Shape().TotalSize(); auto dataType = inputValue->GetDataType(); auto& outputValue = valuesAfterAggregate[i]; assert(numElements == outputValue->Shape().TotalSize()); assert(dataType == outputValue->GetDataType()); assert(inputValue->Device() == outputValue->Device()); void* inputData = (ShouldCopyDataToCPU(inputValue)) ? m_intermediateCPUBuffers[i].data.get() : GetDataBuffer(inputValue); void* outputData = (ShouldCopyDataToCPU(inputValue)) ? m_intermediateCPUBuffers[i].data.get() : GetDataBuffer(outputValue); if (dataType == DataType::Float) { AllReduceGradients(static_cast(inputData), static_cast(outputData), numElements, allReduceRequests, (inputValue->Device() == DeviceDescriptor::CPUDevice())); } else if (dataType == DataType::Double) { AllReduceGradients(static_cast(inputData), static_cast(outputData), numElements, allReduceRequests, (inputValue->Device() == DeviceDescriptor::CPUDevice())); } else LogicError("MPICommunicator: Unknown DataType."); } if (m_nccl->IsSupported()) { m_nccl->Sync(); } // wait for async all reduce to complete. As soon as one of the requests is finished, // check if corresponding value is gpu bound and, if it is the case, initiate a cpu-to-gpu transfer. size_t numAllReduceRequestsCompleted = 0; while (numAllReduceRequestsCompleted < allReduceRequests.size()) { int idx = MPI_UNDEFINED; m_mpi->WaitAny(allReduceRequests.data(), (int)allReduceRequests.size(), &idx); if (idx == MPI_UNDEFINED) { break; } numAllReduceRequestsCompleted++; assert(idx < valuesToAggregate.size()); auto value = valuesToAggregate[idx]; if (ShouldCopyDataToCPU(value)) { auto view = valuesAfterAggregate[idx]; auto size = GetBufferSize(view); auto& transferer = m_gpuDataTransferers[idx]; auto& buffer = m_intermediateCPUBuffers[idx]; transferer->CopyCPUToGPUAsync(buffer.data.get(), size, GetDataBuffer(view)); } } // TODO: Should not wait, simply publishing event on the compute stream should be sufficient for (auto i = 0; i < numValues; ++i) { if (ShouldCopyDataToCPU(valuesToAggregate[i])) m_gpuDataTransferers[i]->WaitForCopyCPUToGPUAsync(); } // Unpack the continuous buffer UnpackFromContinuousBuffer(m_aggregationBufferFloat.get(), outputValues, packedFloatGradientsIndex); UnpackFromContinuousBuffer(m_aggregationBufferDouble.get(), outputValues, packedDoubleGradientsIndex); } void MPICommunicatorImpl::Barrier() { m_mpi->WaitAll(); } bool MPICommunicatorImpl::ShouldCopyDataToCPU(NDArrayViewPtr inputValue) { if (inputValue->Device() == DeviceDescriptor::CPUDevice()) return false; // Donot copy if NCCL is supported or GPUDirect RDMA is used if (m_nccl->IsSupported() || m_mpi->UseGpuGdr()) return false; return true; } void MPICommunicatorImpl::CopyDataFromGPUToCPU(std::vector& inputValues) { for (auto i = 0; i < inputValues.size(); ++i) { auto view = inputValues[i]; if (ShouldCopyDataToCPU(inputValues[i])) { auto& transferer = m_gpuDataTransferers[i]; auto& buffer = m_intermediateCPUBuffers[i]; transferer->CopyGPUToCPUAsync(GetDataBuffer(view), GetBufferSize(view), buffer.data.get()); } } } template std::unique_ptr> MPICommunicatorImpl::SetContinuousBuffer(std::vector& packedGradientsIndex, size_t packedGradientsSizeInBytes, const std::vector& inputValues, const std::vector& outputValues, std::vector& valuesToAggregate, std::vector& valuesAfterAggregate) { if (packedGradientsIndex.size() > 1) { return std::unique_ptr>{new (std::nothrow) Matrix(1, packedGradientsSizeInBytes / sizeof(ElemType), AsCNTKImplDeviceId(inputValues[packedGradientsIndex[0]]->Device()))}; } else if (packedGradientsIndex.size() == 1) { valuesToAggregate.push_back(inputValues[packedGradientsIndex.front()]); valuesAfterAggregate.push_back(outputValues[packedGradientsIndex.front()]); packedGradientsIndex.clear(); } return std::unique_ptr>{ nullptr }; } template void MPICommunicatorImpl::PackToContinuousBuffer(Matrix* aggregationBuffer, std::vector& packedGradientsIndex, const std::vector& inputValues, const std::vector& outputValues, std::vector& valuesToAggregate, std::vector& valuesAfterAggregate) { if (packedGradientsIndex.size() < 1) { return; } if (aggregationBuffer == nullptr || packedGradientsIndex.size() == 1) { for (size_t i : packedGradientsIndex) { valuesToAggregate.push_back(inputValues[i]); valuesAfterAggregate.push_back(outputValues[i]); } packedGradientsIndex.clear(); return; } size_t offset = 0; for (size_t i : packedGradientsIndex) { auto gradient = GetWritableMatrix(inputValues[i]); aggregationBuffer->ColumnSlice(offset, gradient->GetNumElements()).AssignValuesOf(gradient->Reshaped(1, gradient->GetNumElements())); offset += gradient->GetNumElements(); } ::CNTK::NDShape shape{ aggregationBuffer->GetNumElements() }; auto data = ::CNTK::MakeSharedObject<::CNTK::NDArrayView>(inputValues[packedGradientsIndex[0]]->GetDataType(), shape, aggregationBuffer->Data(), offset * sizeof(ElemType), inputValues[packedGradientsIndex[0]]->Device()); valuesToAggregate.push_back(data); valuesAfterAggregate.push_back(data); } template void MPICommunicatorImpl::UnpackFromContinuousBuffer(Matrix* aggregationBuffer, const std::vector& outputValues, std::vector& packedGradientsIndex) { if (packedGradientsIndex.size() != 0) { size_t offset = 0; for (size_t i : packedGradientsIndex) { auto gradient = GetWritableMatrix(outputValues[i]); gradient->AssignValuesOf(aggregationBuffer->ColumnSlice(offset, gradient->GetNumElements()).Reshaped(gradient->GetNumRows(), gradient->GetNumCols())); offset += gradient->GetNumElements(); } } } template void MPICommunicatorImpl::AllReduceGradients(ElemType* inputData, ElemType* outputData, size_t numElements, std::vector &allReduceRequests, bool dataOnCPU) { if (m_nccl->IsSupported() && !dataOnCPU) { m_nccl->AllReduce(inputData, outputData, numElements); return; } if (m_mpi->UseGpuGdr()) { if (inputData == outputData) m_mpi->AllReduce(outputData, numElements); else m_mpi->AllReduce(inputData, outputData, numElements); return; } allReduceRequests.push_back(MPI_Request()); if (inputData == outputData) m_mpi->AllReduceAsync(outputData, numElements, &allReduceRequests.back()); else m_mpi->AllReduceAsync(inputData, outputData, numElements, &allReduceRequests.back()); } }