// // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE.md file in the project root for full license information. // // Swig wrapper for deserializers, the file should be used only from cntk_py.i. // #pragma once #include namespace CNTK { // A scope guard class that makes sure that the current thread // has properly acquired GIL. class GilStateGuard final { PyGILState_STATE m_state; public: GilStateGuard() : m_state(PyGILState_Ensure()) {} ~GilStateGuard() { PyGILState_Release(m_state); } private: GilStateGuard(const GilStateGuard&) = delete; GilStateGuard& operator=(const GilStateGuard&) = delete; GilStateGuard& operator=(GilStateGuard&&) = delete; GilStateGuard(GilStateGuard&& other) = delete; }; typedef std::shared_ptr PyObjectPtr; // Wraps a python object pointer into a shared pointer // with thread safe destructor. inline PyObjectPtr MakeShared(PyObject* object, bool increaseRefCount = true) { if (increaseRefCount) Py_XINCREF(object); return PyObjectPtr(object, [](PyObject* p) { // The destructor can potentially be called on another thread (prefetch, i.e. // for sequence and chunk destructor). // We should make sure that the state of the thread is properly initialized // and GIL is aquired before using any Python API. GilStateGuard guard; Py_XDECREF(p); }); } // Dense sequence that references some memory from a Python object. // Makes sure the Python object is not released while the sequence exists. struct DenseDataFromPy final : public DenseSequenceData { DenseDataFromPy(void* ptr, unsigned int numberOfSamples, const PyObjectPtr& object) : DenseSequenceData(numberOfSamples), m_data(ptr), m_object(object) { } virtual const void* GetDataBuffer() { return m_data; } virtual const NDShape& GetSampleShape() { LogicError("All sequences have the same shape, please use stream.shape instead."); } private: void* m_data; PyObjectPtr m_object; DenseDataFromPy(const DenseDataFromPy&) = delete; DenseDataFromPy& operator=(const DenseDataFromPy&) = delete; DenseDataFromPy& operator=(DenseDataFromPy&&) = delete; DenseDataFromPy(DenseDataFromPy&& other) = delete; }; // Sparse sequence that references some memory from a Python object. // Makes sure the Python object is not released while the sequence exists. struct SparseDataFromPy final : public SparseSequenceData { SparseDataFromPy(void* data, SparseIndexType* indices, SparseIndexType nonZeroCount, unsigned int numberOfSamples, const PyObjectPtr& object) : m_data(data), m_object(object), SparseSequenceData(numberOfSamples) { m_indices = indices; m_totalNnzCount = nonZeroCount; } virtual const void* GetDataBuffer() { return m_data; } virtual const NDShape& GetSampleShape() { LogicError("All sequences have the same shape, please use stream.shape instead."); } private: void* m_data; PyObjectPtr m_object; SparseDataFromPy(const SparseDataFromPy&) = delete; SparseDataFromPy& operator=(const SparseDataFromPy&) = delete; SparseDataFromPy& operator=(SparseDataFromPy&&) = delete; SparseDataFromPy(SparseDataFromPy&& other) = delete; }; // A wrapper around a Python chunk // A Python chunk is a dictionary of the following form: // { -> [numpy array | csr_matrix | list of numpy arrays | list of csr_matrices] } class SwigChunk final : public Chunk { std::vector m_streamInfos; // A helper function that creates a dense sequence data from a Python array. SequenceDataPtr FromNumPy(PyArrayObject* array, const StreamInformation& info) { int rank = PyArray_NDIM(array); npy_intp* np_shape = PyArray_SHAPE(array); // In case rank is the same as in sample layout // the sequence length is 1, othewise we take it from the first dimension. uint32_t numSamples = info.m_sampleLayout.Rank() == rank ? 1 : static_cast(np_shape[0]); auto type = PyArray_TYPE(array); if (type != NPY_FLOAT32) RuntimeError("Only array of type float is currently supported."); return std::make_shared(PyArray_DATA(array), numSamples, MakeShared((PyObject*)array)); } SequenceDataPtr FromCSR(PyObject* object, const StreamInformation& info) { PyObjectPtr data = GetProperty(object, "data"); PyArrayObject* dataRaw = (PyArrayObject*)data.get(); auto type = PyArray_TYPE(dataRaw); if (type != NPY_FLOAT32) RuntimeError("Only csr_matrix of float is currently supported."); PyObjectPtr indptr = GetProperty(object, "indptr"); PyArrayObject* indptrRaw = (PyArrayObject*)indptr.get(); PyObjectPtr indices = GetProperty(object, "indices"); PyArrayObject* indicesRaw = (PyArrayObject*)indices.get(); PyObjectPtr shape = GetProperty(object, "shape"); auto numElements = PyTuple_GET_ITEM(shape.get(), 0); auto result = std::make_shared( PyArray_DATA(dataRaw), (SparseIndexType*)PyArray_DATA(indicesRaw), static_cast(PyArray_SIZE(dataRaw)), static_cast(PyLong_AsLong(numElements)), MakeShared(object)); // Checking the type type = PyArray_TYPE(indptrRaw); size_t elementSize = 0; switch (type) { case NPY_LONG: elementSize = NPY_SIZEOF_LONG; break; case NPY_INT: elementSize = NPY_SIZEOF_INT; break; default: RuntimeError("Unsupported index type '%d'", type); } if (elementSize != sizeof(SparseIndexType)) RuntimeError("Number of bits for index is unsupported for type '%d'", type); // Filling in nnzCount auto nnzCountsSize = PyArray_SIZE(indptrRaw); result->m_nnzCounts.resize(nnzCountsSize); memcpy(&result->m_nnzCounts[0], PyArray_DATA(indptrRaw), nnzCountsSize * elementSize); for (size_t i = 0; i < result->m_nnzCounts.size() - 1; ++i) result->m_nnzCounts[i] = result->m_nnzCounts[i + 1] - result->m_nnzCounts[i]; result->m_nnzCounts.resize(result->m_nnzCounts.size() - 1); return result; } static std::wstring ToWString(PyObject* object) { if (object == nullptr) InvalidArgument("Null cannot be converted to string."); if (PyUnicode_Check(object)) return std::wstring((wchar_t*)PyUnicode_AS_UNICODE(object), PyUnicode_GET_SIZE(object)); if (PyString_Check(object)) // Non unicode string. { std::string tmp(PyString_AsString(object)); return std::wstring(tmp.begin(), tmp.end()); } RuntimeError("Expected a string, '%s' was provided.", object->ob_type->tp_name); return std::wstring(); // make compiler happy. } public: SwigChunk(size_t chunkId, const std::vector& streamInfos, PyObject* chunk) : m_streamInfos(streamInfos), m_chunkId(chunkId) { m_pyChunk = MakeShared(chunk); // Chunks are dictionaries "stream name" -> . if (!PyDict_Check(m_pyChunk.get())) RuntimeError("The chunk should be a dictionary of a stream name into a list of sequences"); PyObject* key; PyObject* value; Py_ssize_t pos = 0; std::vector pyData; // Remembering the py chunk for each stream. pyData.resize(m_streamInfos.size(), nullptr); while (PyDict_Next(m_pyChunk.get(), &pos, &key, &value)) { auto name = ToWString(key); auto it = std::find_if(m_streamInfos.begin(), m_streamInfos.end(), [name](const StreamInformation& s) { return s.m_name == name; }); if (it == m_streamInfos.end()) RuntimeError("Stream with name '%ls' does not exist", name.c_str()); pyData[it - m_streamInfos.begin()] = value; } // Let's check that the size of the chunk across all streams // is the same. size_t size = GetFirstDimension(pyData.front()); for (size_t i = 1; i < m_streamInfos.size(); i++) { auto currentSize = GetFirstDimension(pyData[i]); if (size != currentSize) RuntimeError("Please provide an equal number of sequences across all streams in the chunk" ", currently stream '%ls' has %d sequences, whereas '%ls'- %d", m_streamInfos.front().m_name.c_str(), (int)size, m_streamInfos[i].m_name.c_str(), (int)currentSize); } // Now fill in the data for all streams. // The data for sequence I is at position I * . m_data.resize(size * m_streamInfos.size()); for (size_t i = 0; i < m_streamInfos.size(); i++) FillChunkData(i, pyData[i], size); } // Helper function that returns the first dimension of the object (list, numpy or csr_matrix). size_t GetFirstDimension(PyObject* o) { if (PyList_Check(o)) return PyList_Size(o); else if (PyArray_Check(o)) { PyArrayObject* array = (PyArrayObject*)o; npy_intp* np_shape = PyArray_SHAPE(array); return np_shape[0]; } // TODO: profile, probably need to have some form of // vtable in here, same goes for other places where we use string comparisons. else if (o->ob_type->tp_name == std::string("csr_matrix")) { auto shape = GetProperty(o, "shape"); return static_cast(PyLong_AsLong(PyTuple_GET_ITEM(shape.get(), 0))); } else RuntimeError("Unexpected type %s, only list, numpy or csr_matrix are expected.", o->ob_type->tp_name); return 0; } // For a given stream and from the Python data, the functions fills in m_data with // sequences. void FillChunkData(size_t streamIndex, PyObject* o, size_t dataSize) { auto storageFormat = m_streamInfos[streamIndex].m_storageFormat; if (PyList_Check(o)) // Data is a list of sequences. FillDataWithSequences(streamIndex, o, dataSize); // Data is a numpy array of dense samples. else if (storageFormat == StorageFormat::Dense && PyArray_Check(o)) FillDataWithDenseSamples(streamIndex, o, dataSize); // Data is a csr matrix of sparse samples. else if (storageFormat == StorageFormat::SparseCSC && o->ob_type->tp_name == std::string("csr_matrix")) FillDataWithSparseSamples(streamIndex, o, dataSize); else RuntimeError("Unexpected data type '%s'. Please use numpy arrays, csr_matrix or list of those.", o->ob_type->tp_name); } // Fills chunk data with dense samples. void FillDataWithDenseSamples(size_t streamIndex, PyObject* o, size_t dataSize) { const auto& info = m_streamInfos[streamIndex]; PyArrayObject* array = (PyArrayObject*)o; int rank = PyArray_NDIM(array); auto type = PyArray_TYPE(array); if (type != NPY_FLOAT32) RuntimeError("Only float numbers are currently supported."); if (info.m_sampleLayout.Rank() + 1 != rank) RuntimeError("Dense data supported only as single sample per row."); for (size_t i = 0; i < dataSize; ++i) { auto d = (float*)PyArray_GETPTR1(array, i); auto sequence = std::make_shared(d, 1, m_pyChunk); m_data[i * m_streamInfos.size() + streamIndex] = sequence; } } // Fills chunk data with sparse samples. void FillDataWithSparseSamples(size_t streamIndex, PyObject* o, size_t dataSize) { PyObjectPtr pyData = GetProperty(o, "data"); PyArrayObject* pyDataRaw = (PyArrayObject*)pyData.get(); auto type = PyArray_TYPE(pyDataRaw); if (type != NPY_FLOAT32) RuntimeError("Only float numbers are currently supported."); auto data = (float*)PyArray_DATA(pyDataRaw); PyObjectPtr indices = GetProperty(o, "indices"); SparseIndexType* indicesRaw = (SparseIndexType*)PyArray_DATA((PyArrayObject*)indices.get()); PyObjectPtr indptr = GetProperty(o, "indptr"); SparseIndexType* indptrRaw = (SparseIndexType*)PyArray_DATA((PyArrayObject*)indptr.get()); for (size_t i = 0; i < dataSize; ++i) { auto sequence = std::make_shared( data + indptrRaw[i], indicesRaw + indptrRaw[i], indptrRaw[i + 1] - indptrRaw[i], 1, m_pyChunk); sequence->m_nnzCounts.resize(1, indptrRaw[i + 1] - indptrRaw[i]); m_data[i * m_streamInfos.size() + streamIndex] = sequence; } } // Fills chunk data with sequences. void FillDataWithSequences(size_t streamIndex, PyObject* o, size_t dataSize) { const auto& info = m_streamInfos[streamIndex]; auto storageFormat = info.m_storageFormat; m_sampleMode = false; for (size_t i = 0; i < dataSize; ++i) { PyObject* item = PyList_GetItem(o, i); SequenceDataPtr sequence = nullptr; if (storageFormat == StorageFormat::Dense) { if (!PyArray_Check(item)) RuntimeError("Expecting dense data to be represented as a numpy array."); sequence = FromNumPy((PyArrayObject*)item, info); } else // Sparse { if (item->ob_type->tp_name != std::string("csr_matrix")) RuntimeError("Expecting sparse data to be represented as a csr_matrix."); sequence = FromCSR(item, info); } m_data[i * m_streamInfos.size() + streamIndex] = sequence; } } // Gets sequence infos for the chunk. void SequenceInfos(std::vector& descriptions) { size_t numSequences = m_data.size() / m_streamInfos.size(); descriptions.reserve(numSequences); if (m_sampleMode) { for (size_t i = 0; i < numSequences; ++i) descriptions.push_back(SequenceInfo{ i, 1, (ChunkIdType)m_chunkId }); } else { // Implement logic to specify mbsize based on a stream. const StreamInformation* pDefMbInfo = nullptr; for (const StreamInformation& info : m_streamInfos) { if (info.m_definesMbSize) { if (pDefMbInfo == nullptr) pDefMbInfo = &info; else RuntimeError("Only a single stream is allowed to define minibatch size, but at least two are found."); } } // Scan over the data to set sampleCount for each sequence unsigned int sampleCount = 1; for (size_t i = 0, j = 0; i < m_data.size(); ++i) { //Note that the stream streamIndex of sequence j is at m_data[j * m_streamInfos.size() + streamIndex] size_t streamIndex = i % m_streamInfos.size(); if (pDefMbInfo == nullptr) //No stream is specified to define the minibatch size, the number of samples in the sequence //is defined by the stream with maximum number of samples sampleCount = std::max(sampleCount, m_data[i]->m_numberOfSamples); else if (pDefMbInfo == &m_streamInfos[streamIndex]) //A stream is specified to define the minibatch size, the number of samples in the sequence //is defined by this stream sampleCount = m_data[i]->m_numberOfSamples; // Last stream of the sequence, remember the max sample count as the sequence sample count. if (streamIndex == m_streamInfos.size() - 1) { descriptions.push_back(SequenceInfo{ j++, sampleCount, (ChunkIdType)m_chunkId }); sampleCount = 1; } } } } // Get sequence data for a given sequence index across all streams. void GetSequence(size_t sequenceIndex, std::vector& result) override { auto offset = m_data.data() + sequenceIndex * m_streamInfos.size(); result.insert(result.end(), offset, offset + m_streamInfos.size()); } // Get property of python object by name. PyObjectPtr GetProperty(PyObject* object, const std::string& propertyName) { // TODO: profile, probably need to have some form of // vtable in here, same goes for other places where we use string comparisons. auto result = PyObject_GetAttrString(object, propertyName.c_str()); if (!result) RuntimeError("PyObject does not have property '%s'.", propertyName.c_str()); // PyObject_GetAttrString() increases the refcount internally, so when we wrap the pointer in shared_ptr, // we pass false to not increase refcount again. During delete, the refcount is decreased normally in shared_ptr's deleter. return MakeShared(result, false); } private: size_t m_chunkId; // Chunk id bool m_sampleMode = true; // True, if the data is in sample form. std::shared_ptr m_pyChunk; // Python chunk data. std::vector m_data; // Sequence data for each sequence in chunk. }; // Swig deserializer is used to expose user defined deserializers // to Python. class SwigDataDeserializer : public DataDeserializer { std::vector m_streamInfos; std::once_flag m_streamInfosInitFlag; std::vector m_chunkInfos; std::once_flag m_chunkInfosInitFlag; public: SwigDataDeserializer() { } // Interface implemented in Python. virtual void _GetStreamInfos(std::vector&) { NOT_IMPLEMENTED; } virtual void _GetChunkInfos(std::vector&) { NOT_IMPLEMENTED; } virtual PyObject* _GetChunk(ChunkIdType chunkId) { NOT_IMPLEMENTED; return nullptr; } // Simple python redirectors. std::vector StreamInfos() override { std::call_once(m_streamInfosInitFlag, [this]() { _GetStreamInfos(m_streamInfos); }); return m_streamInfos; } std::vector ChunkInfos() override { std::call_once(m_chunkInfosInitFlag, [this]() { _GetChunkInfos(m_chunkInfos); }); return m_chunkInfos; } ChunkPtr GetChunk(ChunkIdType chunkId) { auto chunk = _GetChunk(chunkId); GilStateGuard guard; return std::make_shared(chunkId, m_streamInfos, chunk); } bool GetSequenceInfo(const SequenceInfo& primary, SequenceInfo& description) override { NOT_IMPLEMENTED; return false; } void SequenceInfosForChunk(ChunkIdType chunkId, std::vector& descriptions) override { NOT_IMPLEMENTED; } }; }