diff --git a/DeviceAdapters/DemoCamera/DemoCamera.cpp b/DeviceAdapters/DemoCamera/DemoCamera.cpp index 0a9fb3fdf..20e65a82b 100644 --- a/DeviceAdapters/DemoCamera/DemoCamera.cpp +++ b/DeviceAdapters/DemoCamera/DemoCamera.cpp @@ -54,6 +54,7 @@ const char* g_DADeviceName = "D-DA"; const char* g_DA2DeviceName = "D-DA2"; const char* g_GalvoDeviceName = "DGalvo"; const char* g_MagnifierDeviceName = "DOptovar"; +const char* g_DataStreamerDeviceName = "DDataStreamer"; const char* g_HubDeviceName = "DHub"; // constants for naming pixel types (allowed values of the "PixelType" property) @@ -89,6 +90,7 @@ MODULE_API void InitializeModuleData() RegisterDevice(g_DA2DeviceName, MM::SignalIODevice, "Demo DA-2"); RegisterDevice(g_MagnifierDeviceName, MM::MagnifierDevice, "Demo Optovar"); RegisterDevice(g_GalvoDeviceName, MM::GalvoDevice, "Demo Galvo"); + RegisterDevice(g_DataStreamerDeviceName, MM::DataStreamerDevice, "Demo DataStreamer"); RegisterDevice("TransposeProcessor", MM::ImageProcessorDevice, "TransposeProcessor"); RegisterDevice("ImageFlipX", MM::ImageProcessorDevice, "ImageFlipX"); RegisterDevice("ImageFlipY", MM::ImageProcessorDevice, "ImageFlipY"); @@ -167,6 +169,11 @@ MODULE_API MM::Device* CreateDevice(const char* deviceName) // create Galvo return new DemoGalvo(); } + else if (strcmp(deviceName, g_DataStreamerDeviceName) == 0) + { + // create DataStreamer + return new DemoDataStreamer(); + } else if(strcmp(deviceName, "TransposeProcessor") == 0) { @@ -4669,6 +4676,239 @@ bool DemoGalvo::PointInTriangle(Point p, Point p0, Point p1, Point p2) return s > 0 && t > 0 && (s + t) < A; } +/////////////////////////////////////////////////////////// +// DemoDataStreamer +DemoDataStreamer::DemoDataStreamer() : + mockDataSize_(1024), + acqPeriod_(1000), + procPeriod_(1000), + errorGetBufferSizeAt_(65535), + errorGetBufferAt_(65535), + errorProcessBufferAt_(65535), + initialized_(false) +{ + // parent ID display + CreateHubIDProperty(); +} + +DemoDataStreamer::~DemoDataStreamer() +{ + Shutdown(); +} + +void DemoDataStreamer::GetName(char* pszName) const { + CDeviceUtils::CopyLimitedString(pszName, g_DataStreamerDeviceName); +} + +bool DemoDataStreamer::Busy() { + return false; +} + +int DemoDataStreamer::Initialize() +{ + + SetErrorText(errorCodeGetBufferSize, "GetBufferSize error message"); + SetErrorText(errorCodeGetBuffer, "GetBuffer error message"); + SetErrorText(errorCodeProcessBuffer, "ProcessBuffer error message"); + + int ret; + ret = CreateFloatProperty("Average data value", NAN, false); + assert(ret == DEVICE_OK); + + CPropertyAction* pAct = pAct = new CPropertyAction(this, &DemoDataStreamer::OnAcquisitionPeriod); + ret = CreateIntegerProperty("Acquisition period in ms", acqPeriod_, false, pAct); + assert(ret == DEVICE_OK); + SetPropertyLimits("Acquisition period in ms", 0, 10000); + + pAct = pAct = new CPropertyAction(this, &DemoDataStreamer::OnProcessingPeriod); + ret = CreateIntegerProperty("Processing period in ms", procPeriod_, false, pAct); + assert(ret == DEVICE_OK); + SetPropertyLimits("Processing period in ms", 0, 10000); + + pAct = pAct = new CPropertyAction(this, &DemoDataStreamer::OnGenerateGetBufferSizeErrorAt); + ret = CreateIntegerProperty("Generate GetBufferSize error at", errorGetBufferSizeAt_, false, pAct); + assert(ret == DEVICE_OK); + SetPropertyLimits("Generate GetBufferSize error at", 1, 65535); + + pAct = pAct = new CPropertyAction(this, &DemoDataStreamer::OnGenerateGetBufferErrorAt); + ret = CreateIntegerProperty("Generate GetBuffer error at", errorGetBufferAt_, false, pAct); + assert(ret == DEVICE_OK); + SetPropertyLimits("Generate GetBuffer error at", 1, 65535); + + pAct = pAct = new CPropertyAction(this, &DemoDataStreamer::OnGenerateProcessBufferErrorAt); + ret = CreateIntegerProperty("Generate ProcessBuffer error at", errorProcessBufferAt_, false, pAct); + assert(ret == DEVICE_OK); + SetPropertyLimits("Generate ProcessBuffer error at", 1, 65535); + + initialized_ = true; + return DEVICE_OK; +} + +int DemoDataStreamer::Shutdown() { + initialized_ = false; + return DEVICE_OK; +} + +int DemoDataStreamer::StartStream() { + // calls to hardware should be implemented here + counter_ = 1; + int ret; + ret = this->StartDataStreamerThreads(); // this line must be present in every StartStream implementation + return ret; +} + +int DemoDataStreamer::StopStream() { + // calls to hardware should be implemented here + int ret; + ret = this->StopDataStreamerThreads(); // this line must be present in every StopStream implementation + return ret; +} + +int DemoDataStreamer::GetBufferSize(unsigned& dataBufferSize) { + if (counter_ == errorGetBufferSizeAt_) return errorCodeGetBufferSize; + dataBufferSize = mockDataSize_; + return DEVICE_OK; +} + +std::unique_ptr DemoDataStreamer::GetBuffer(unsigned expectedDataBufferSize, unsigned& actualDataBufferSize, int& exitStatus) { + + if (counter_ == errorGetBufferAt_) { + actualDataBufferSize = 0; + exitStatus = errorCodeGetBuffer; + return 0; + } + + if (expectedDataBufferSize <= mockDataSize_) { + actualDataBufferSize = expectedDataBufferSize; + } + else { + actualDataBufferSize = mockDataSize_; + } + + // allocate a new data array and put data in it + std::unique_ptr data(new char[actualDataBufferSize]); + int* ptr = (int*)data.get(); + for (size_t ii = 0; ii < actualDataBufferSize/4; ii++) { + *ptr = counter_; + ptr++; + } + counter_++; + exitStatus = DEVICE_OK; + Sleep(acqPeriod_); + + return data; +} + +int DemoDataStreamer::ProcessBuffer(std::unique_ptr& pDataBuffer, unsigned dataSize) { + if (counter_ == errorProcessBufferAt_) return errorCodeProcessBuffer; + double ave = 0; + int* ptr = (int*)pDataBuffer.get(); + for (size_t ii = 0; ii < dataSize/4; ii++) { + ave += *ptr; + ptr++; + } + ave = ave / (dataSize/4); + SetProperty("Average data value", to_string(ave).c_str()); + Sleep(procPeriod_); + return DEVICE_OK; +} + +/** +* Handles "Acquisition period in ms" property. +*/ +int DemoDataStreamer::OnAcquisitionPeriod(MM::PropertyBase* pProp, MM::ActionType eAct) +{ + if (eAct == MM::BeforeGet) + { + pProp->Set(acqPeriod_); + } + else if (eAct == MM::AfterSet) + { + if (this->IsStreaming()) return DEVICE_DATASTREAMER_BUSY_ACQUIRING; + long newPeriod; + pProp->Get(newPeriod); + if (newPeriod>pProp->GetUpperLimit() || newPeriodGetLowerLimit()) + { + pProp->Set(acqPeriod_); // revert + return DEVICE_INVALID_PROPERTY_VALUE; + } + acqPeriod_ = newPeriod; + } + return DEVICE_OK; +} + +/** +* Handles "Acquisition period in ms" property. +*/ +int DemoDataStreamer::OnProcessingPeriod(MM::PropertyBase* pProp, MM::ActionType eAct) +{ + if (eAct == MM::BeforeGet) + { + pProp->Set(procPeriod_); + } + else if (eAct == MM::AfterSet) + { + if (this->IsStreaming()) return DEVICE_DATASTREAMER_BUSY_ACQUIRING; + long newPeriod; + pProp->Get(newPeriod); + if (newPeriod > pProp->GetUpperLimit() || newPeriod < pProp->GetLowerLimit()) + { + pProp->Set(procPeriod_); // revert + return DEVICE_INVALID_PROPERTY_VALUE; + } + procPeriod_ = newPeriod; + } + return DEVICE_OK; +} + +int DemoDataStreamer::OnGenerateGetBufferSizeErrorAt(MM::PropertyBase* pProp, MM::ActionType eAct) +{ + if (eAct == MM::BeforeGet) + { + pProp->Set(errorGetBufferSizeAt_); + } + else if (eAct == MM::AfterSet) + { + if (this->IsStreaming()) return DEVICE_DATASTREAMER_BUSY_ACQUIRING; + long newval; + pProp->Get(newval); + errorGetBufferSizeAt_ = newval; + } + return DEVICE_OK; +} + +int DemoDataStreamer::OnGenerateGetBufferErrorAt(MM::PropertyBase* pProp, MM::ActionType eAct) +{ + if (eAct == MM::BeforeGet) + { + pProp->Set(errorGetBufferAt_); + } + else if (eAct == MM::AfterSet) + { + if (this->IsStreaming()) return DEVICE_DATASTREAMER_BUSY_ACQUIRING; + long newval; + pProp->Get(newval); + errorGetBufferAt_ = newval; + } + return DEVICE_OK; +} + +int DemoDataStreamer::OnGenerateProcessBufferErrorAt(MM::PropertyBase* pProp, MM::ActionType eAct) +{ + if (eAct == MM::BeforeGet) + { + pProp->Set(errorProcessBufferAt_); + } + else if (eAct == MM::AfterSet) + { + if (this->IsStreaming()) return DEVICE_DATASTREAMER_BUSY_ACQUIRING; + long newval; + pProp->Get(newval); + errorProcessBufferAt_ = newval; + } + return DEVICE_OK; +} + ////////// BEGINNING OF POORLY ORGANIZED CODE ////////////// ////////// CLEANUP NEEDED //////////////////////////// diff --git a/DeviceAdapters/DemoCamera/DemoCamera.h b/DeviceAdapters/DemoCamera/DemoCamera.h index acd3df13a..bcbc026f4 100644 --- a/DeviceAdapters/DemoCamera/DemoCamera.h +++ b/DeviceAdapters/DemoCamera/DemoCamera.h @@ -38,6 +38,7 @@ #include #include #include +#include ////////////////////////////////////////////////////////////////////////////// // Error codes @@ -1194,6 +1195,48 @@ class DemoGalvo : public CGalvoBase, ImgManipulator double vMaxY_; }; +////////////////////////////////////////////////////////////////////////////// +// DemoDataStreamer class +// Simulation of data streamer device +////////////////////////////////////////////////////////////////////////////// +class DemoDataStreamer : public CDataStreamerBase +{ +public: + DemoDataStreamer(); + ~DemoDataStreamer(); + + int Initialize(); + int Shutdown(); + void GetName(char* pszName) const; + bool Busy(); + + int StartStream(); + int StopStream(); + int GetBufferSize(unsigned& dataBufferSize); + std::unique_ptr GetBuffer(unsigned expectedDataBufferSize, unsigned& actualDataBufferSize, int& exitStatus); + int ProcessBuffer(std::unique_ptr& pDataBuffer, unsigned dataSize); + + // action interface + // ---------------- + int OnAcquisitionPeriod(MM::PropertyBase* pProp, MM::ActionType eAct); + int OnProcessingPeriod(MM::PropertyBase* pProp, MM::ActionType eAct); + int OnGenerateGetBufferSizeErrorAt(MM::PropertyBase* pProp, MM::ActionType eAct); + int OnGenerateGetBufferErrorAt(MM::PropertyBase* pProp, MM::ActionType eAct); + int OnGenerateProcessBufferErrorAt(MM::PropertyBase* pProp, MM::ActionType eAct); + +private: + bool initialized_; + unsigned mockDataSize_; + unsigned counter_; + long acqPeriod_; + long procPeriod_; + long errorGetBufferSizeAt_; + long errorGetBufferAt_; + long errorProcessBufferAt_; + const int errorCodeGetBufferSize = 901; + const int errorCodeGetBuffer = 902; + const int errorCodeProcessBuffer = 903; +}; #endif //_DEMOCAMERA_H_ diff --git a/MMCore/CoreUtils.h b/MMCore/CoreUtils.h index d8b6718e5..6a592c423 100644 --- a/MMCore/CoreUtils.h +++ b/MMCore/CoreUtils.h @@ -72,6 +72,7 @@ inline std::string ToString(const MM::DeviceType d) case MM::SLMDevice: return "SLM"; case MM::HubDevice: return "Hub"; case MM::GalvoDevice: return "Galvo"; + case MM::DataStreamerDevice: return "DataStreamer"; } return "Invalid"; } diff --git a/MMCore/Devices/DataStreamerInstance.cpp b/MMCore/Devices/DataStreamerInstance.cpp new file mode 100644 index 000000000..52f600154 --- /dev/null +++ b/MMCore/Devices/DataStreamerInstance.cpp @@ -0,0 +1,41 @@ +// PROJECT: Micro-Manager +// SUBSYSTEM: MMCore +// +// DESCRIPTION: Galvo device instance wrapper +// +// COPYRIGHT: University of California, San Francisco, 2014, +// All Rights reserved +// +// LICENSE: This file is distributed under the "Lesser GPL" (LGPL) license. +// License text is included with the source distribution. +// +// This file is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// +// IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES. +// +// AUTHOR: Mark Tsuchida + +#include "DataStreamerInstance.h" + + +int DataStreamerInstance::GetBufferSize(unsigned& dataBufferSiize) { return GetImpl()->GetBufferSize(dataBufferSiize); } +std::unique_ptr DataStreamerInstance::GetBuffer(unsigned expectedDataBufferSize, unsigned& actualDataBufferSize, int& exitStatus) { return GetImpl()->GetBuffer(expectedDataBufferSize,actualDataBufferSize,exitStatus); } +int DataStreamerInstance::ProcessBuffer(std::unique_ptr& pDataBuffer, unsigned dataSize) { return GetImpl()->ProcessBuffer(pDataBuffer, dataSize); } +int DataStreamerInstance::StartStream() { return GetImpl()->StartStream(); } +int DataStreamerInstance::StopStream() { return GetImpl()->StopStream(); } +bool DataStreamerInstance::GetOverflowStatus() { return GetImpl()->GetOverflowStatus(); } +int DataStreamerInstance::ResetOverflowStatus() { return GetImpl()->ResetOverflowStatus(); } +int DataStreamerInstance::GetAcquisitionExitStatus() { return GetImpl()->GetAcquisitionExitStatus(); } +int DataStreamerInstance::GetProcessingExitStatus() { return GetImpl()->GetProcessingExitStatus(); } +int DataStreamerInstance::SetAcquisitionPause(bool pause) { return GetImpl()->SetAcquisitionPause(pause); } +bool DataStreamerInstance::GetAcquisitionPause() { return GetImpl()->GetAcquisitionPause(); } +bool DataStreamerInstance::IsStreaming() { return GetImpl()->IsStreaming(); } +int DataStreamerInstance::SetStreamParameters(bool stopOnOverflow, bool pauseAcquisitionBeforeOverflow, int numberOfBuffers, int durationUs, int updatePeriodUs) { return GetImpl()->SetStreamParameters(stopOnOverflow, pauseAcquisitionBeforeOverflow, numberOfBuffers, durationUs, updatePeriodUs); } +int DataStreamerInstance::GetStreamParameters(bool& stopOnOverflow, bool& pauseAcquisitionBeforeOverflow, int& numberOfBuffers, int& durationUs, int& updatePeriodUs) { return GetImpl()->GetStreamParameters(stopOnOverflow,pauseAcquisitionBeforeOverflow,numberOfBuffers,durationUs,updatePeriodUs); } +int DataStreamerInstance::SetCircularAcquisitionBufferCapacity(int capacity) { return GetImpl()->SetCircularAcquisitionBufferCapacity(capacity); } +int DataStreamerInstance::GetCircularAcquisitionBufferCapacity() { return GetImpl()->GetCircularAcquisitionBufferCapacity(); } + diff --git a/MMCore/Devices/DataStreamerInstance.h b/MMCore/Devices/DataStreamerInstance.h new file mode 100644 index 000000000..067a02ff4 --- /dev/null +++ b/MMCore/Devices/DataStreamerInstance.h @@ -0,0 +1,55 @@ +// PROJECT: Micro-Manager +// SUBSYSTEM: MMCore +// +// COPYRIGHT: University of California, San Francisco, 2014, +// All Rights reserved +// +// LICENSE: This file is distributed under the "Lesser GPL" (LGPL) license. +// License text is included with the source distribution. +// +// This file is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// +// IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES. +// +// AUTHOR: Mark Tsuchida + +#pragma once + +#include "DeviceInstanceBase.h" + + +class DataStreamerInstance : public DeviceInstanceBase +{ +public: + DataStreamerInstance(CMMCore* core, + std::shared_ptr adapter, + const std::string& name, + MM::Device* pDevice, + DeleteDeviceFunction deleteFunction, + const std::string& label, + mm::logging::Logger deviceLogger, + mm::logging::Logger coreLogger) : + DeviceInstanceBase(core, adapter, name, pDevice, deleteFunction, label, deviceLogger, coreLogger) + {} + + int GetBufferSize(unsigned& dataBufferSiize); + std::unique_ptr GetBuffer(unsigned expectedDataBufferSize, unsigned& actualDataBufferSize, int& exitStatus); + int ProcessBuffer(std::unique_ptr& pDataBuffer, unsigned dataSize); + int StartStream(); + int StopStream(); + bool GetOverflowStatus(); + int ResetOverflowStatus(); + int GetAcquisitionExitStatus(); + int GetProcessingExitStatus(); + int SetAcquisitionPause(bool pause); + bool GetAcquisitionPause(); + bool IsStreaming(); + int SetStreamParameters(bool stopOnOverflow, bool pauseAcquisitionBeforeOverflow, int numberOfBuffers, int durationUs, int updatePeriodUs); + int GetStreamParameters(bool& stopOnOverflow, bool& pauseAcquisitionBeforeOverflow, int& numberOfBuffers, int& durationUs, int& updatePeriodUs); + int SetCircularAcquisitionBufferCapacity(int capacity); + int GetCircularAcquisitionBufferCapacity(); +}; diff --git a/MMCore/Devices/DeviceInstances.h b/MMCore/Devices/DeviceInstances.h index 8dfacf4d7..70d7e894d 100644 --- a/MMCore/Devices/DeviceInstances.h +++ b/MMCore/Devices/DeviceInstances.h @@ -32,4 +32,5 @@ #include "MagnifierInstance.h" #include "SLMInstance.h" #include "GalvoInstance.h" +#include "DataStreamerInstance.h" #include "HubInstance.h" diff --git a/MMCore/LoadableModules/LoadedDeviceAdapter.cpp b/MMCore/LoadableModules/LoadedDeviceAdapter.cpp index caf2a3404..389fd207c 100644 --- a/MMCore/LoadableModules/LoadedDeviceAdapter.cpp +++ b/MMCore/LoadableModules/LoadedDeviceAdapter.cpp @@ -183,6 +183,8 @@ LoadedDeviceAdapter::LoadDevice(CMMCore* core, const std::string& name, return std::make_shared(core, shared_this, name, pDevice, deleter, label, deviceLogger, coreLogger); case MM::GalvoDevice: return std::make_shared(core, shared_this, name, pDevice, deleter, label, deviceLogger, coreLogger); + case MM::DataStreamerDevice: + return std::make_shared(core, shared_this, name, pDevice, deleter, label, deviceLogger, coreLogger); case MM::HubDevice: return std::make_shared(core, shared_this, name, pDevice, deleter, label, deviceLogger, coreLogger); default: diff --git a/MMCore/MMCore.cpp b/MMCore/MMCore.cpp index eb55d5391..a069f553d 100644 --- a/MMCore/MMCore.cpp +++ b/MMCore/MMCore.cpp @@ -6411,6 +6411,198 @@ string CMMCore::getGalvoChannel(const char* deviceLabel) throw (CMMError) /* SYSTEM STATE */ +/** +* Start acquisition with the specified data streamer. +*/ +void CMMCore::startStream(const char* dataStreamerLabel) throw (CMMError) +{ + std::shared_ptr pDataStreamer = deviceManager_->GetDeviceOfType(dataStreamerLabel); + LOG_DEBUG(coreLogger_) << "Starting streaming of " << dataStreamerLabel; + mm::DeviceModuleLockGuard guard(pDataStreamer); + int ret = pDataStreamer->StartStream(); + if (ret != DEVICE_OK) throw CMMError(getDeviceErrorText(ret, pDataStreamer).c_str(), MMERR_DEVICE_GENERIC); +} + +/** +* Stop acquisition with the specified data streamer. +*/ +void CMMCore::stopStream(const char* dataStreamerLabel) throw (CMMError) +{ + std::shared_ptr pDataStreamer = deviceManager_->GetDeviceOfType(dataStreamerLabel); + LOG_DEBUG(coreLogger_) << "Stopping streaming of " << dataStreamerLabel; + mm::DeviceModuleLockGuard guard(pDataStreamer); + int ret = pDataStreamer->StopStream(); + if (ret != DEVICE_OK) throw CMMError(getDeviceErrorText(ret, pDataStreamer).c_str(), MMERR_DEVICE_GENERIC); +} + +/** +* Get overflow status of the specified data streamer. +*/ +bool CMMCore::getOverflowStatus(const char* dataStreamerLabel) throw (CMMError) +{ + std::shared_ptr pDataStreamer = deviceManager_->GetDeviceOfType(dataStreamerLabel); + LOG_DEBUG(coreLogger_) << "Getting overflow status of " << dataStreamerLabel; + mm::DeviceModuleLockGuard guard(pDataStreamer); + return pDataStreamer->GetOverflowStatus(); +} + +/** +* Reset acquisition with the specified data streamer. +*/ +void CMMCore::resetOverflowStatus(const char* dataStreamerLabel) throw (CMMError) +{ + std::shared_ptr pDataStreamer = deviceManager_->GetDeviceOfType(dataStreamerLabel); + LOG_DEBUG(coreLogger_) << "Reseting overflow status of " << dataStreamerLabel; + mm::DeviceModuleLockGuard guard(pDataStreamer); + int ret = pDataStreamer->ResetOverflowStatus(); + if (ret != DEVICE_OK) throw CMMError(getDeviceErrorText(ret, pDataStreamer).c_str(), MMERR_DEVICE_GENERIC); +} + +/** +* Get pause status of the specified data streamer. +*/ +bool CMMCore::getAcquisitionPause(const char* dataStreamerLabel) throw (CMMError) +{ + std::shared_ptr pDataStreamer = deviceManager_->GetDeviceOfType(dataStreamerLabel); + LOG_DEBUG(coreLogger_) << "Checking pause status of " << dataStreamerLabel; + mm::DeviceModuleLockGuard guard(pDataStreamer); + int ret = pDataStreamer->GetAcquisitionPause(); + return ret; +} + +/** +* Set acquisition pause on the specified data streamer. +*/ +void CMMCore::setAcquisitionPause(const char* dataStreamerLabel, bool pause) throw (CMMError) +{ + std::shared_ptr pDataStreamer = deviceManager_->GetDeviceOfType(dataStreamerLabel); + LOG_DEBUG(coreLogger_) << "Setting acquisition pause of " << dataStreamerLabel; + mm::DeviceModuleLockGuard guard(pDataStreamer); + int ret = pDataStreamer->SetAcquisitionPause(pause); + if (ret != DEVICE_OK) throw CMMError(getDeviceErrorText(ret, pDataStreamer).c_str(), MMERR_DEVICE_GENERIC); +} + +/** +* Get streaming status of the specified data streamer. +*/ +bool CMMCore::isStreaming(const char* dataStreamerLabel) throw (CMMError) +{ + std::shared_ptr pDataStreamer = deviceManager_->GetDeviceOfType(dataStreamerLabel); + LOG_DEBUG(coreLogger_) << "Checking streaming status of " << dataStreamerLabel; + mm::DeviceModuleLockGuard guard(pDataStreamer); + int ret = pDataStreamer->IsStreaming(); + return ret; +} + +/** +* Get acquisition thread exit status +*/ +int CMMCore::getAcquisitionExitStatus(const char* dataStreamerLabel) throw (CMMError) +{ + std::shared_ptr pDataStreamer = deviceManager_->GetDeviceOfType(dataStreamerLabel); + LOG_DEBUG(coreLogger_) << "Getting stream exit status of " << dataStreamerLabel; + mm::DeviceModuleLockGuard guard(pDataStreamer); + int ret = pDataStreamer->GetAcquisitionExitStatus(); + return ret; +} + +/** +* Get acquisition thread exit status +*/ +int CMMCore::getProcessingExitStatus(const char* dataStreamerLabel) throw (CMMError) +{ + std::shared_ptr pDataStreamer = deviceManager_->GetDeviceOfType(dataStreamerLabel); + LOG_DEBUG(coreLogger_) << "Getting stream exit status of " << dataStreamerLabel; + mm::DeviceModuleLockGuard guard(pDataStreamer); + int ret = pDataStreamer->GetProcessingExitStatus(); + return ret; +} + +/** + * Sets data streamer parameters + * @param label the data streamer device label + * @param stopOnOverflow stop streaming if circular streaming buffer overflows + * @param numberOfBlocks number of data blocks to collect + * @param durationUs collection duration in microseconds + * @param updatePeriodUs update period in microseconds + */ +void CMMCore::setStreamParameters(const char* dataStreamerLabel, bool stopOnOverflow, bool pauseAcquisitionBeforeOverflow, + int numberOfBlocks, int durationMs, int updatePeriodMs) +{ + std::shared_ptr pDataStreamer = deviceManager_->GetDeviceOfType(dataStreamerLabel); + + LOG_DEBUG(coreLogger_) << "Setting stream parameters of " << dataStreamerLabel << + " to stopOnOverflow=" << stopOnOverflow << ", " << + "pauseAcquisitionBeforeOverflow=" << pauseAcquisitionBeforeOverflow << ", " << + "numberOfBlocks=" << numberOfBlocks << ", " << + "durationMs=" << durationMs << ", " << + "updatePeriodMs=" << updatePeriodMs; + + mm::DeviceModuleLockGuard guard(pDataStreamer); + int ret = pDataStreamer->SetStreamParameters(stopOnOverflow, pauseAcquisitionBeforeOverflow, numberOfBlocks, durationMs, updatePeriodMs); + if (ret != DEVICE_OK) throw CMMError(getDeviceErrorText(ret, pDataStreamer).c_str(), MMERR_DEVICE_GENERIC); +} + +/** + * Obtains current parameters of the data streamer. + * @param label the data streamer device label + * @param stopOnOverflow a return parameter indicating if streaming should stop when acquisition buffer overflow takes place + * @param numberOfBlocks a return parameter that gives the number of blocks to be acquired + * @param durationUs a return parameter that gives duration of the stream + * @param updatePeriodUs a return parameter that gives the update rate of the stream + */ +void CMMCore::getStreamParameters(const char* dataStreamerLabel, bool& stopOnOverflow, bool& pauseAcquisitionBeforeOverflow, + int& numberOfBlocks, int& durationMs, int& updatePeriodMs) throw (CMMError) +{ + std::shared_ptr pDataStreamer = deviceManager_->GetDeviceOfType(dataStreamerLabel); + LOG_DEBUG(coreLogger_) << "Getting stream parameters of " << dataStreamerLabel; + mm::DeviceModuleLockGuard guard(pDataStreamer); + int ret = pDataStreamer->GetStreamParameters(stopOnOverflow, pauseAcquisitionBeforeOverflow, numberOfBlocks, durationMs, updatePeriodMs); + if (ret != DEVICE_OK) throw CMMError(getDeviceErrorText(ret, pDataStreamer).c_str(), MMERR_DEVICE_GENERIC); +} + +/** + * Sets circular acquisition buffer capacity + * @param label the data streamer device label + * @param capacity number of data blocks in the circular buffer + */ +void CMMCore::setCircularAcquisitionBufferCapacity(const char* dataStreamerLabel, int capacity) +{ + std::shared_ptr pDataStreamer = deviceManager_->GetDeviceOfType(dataStreamerLabel); + LOG_DEBUG(coreLogger_) << "Setting circular acquisition buffer capacity of " << dataStreamerLabel << " to " << capacity; + mm::DeviceModuleLockGuard guard(pDataStreamer); + int ret = pDataStreamer->SetCircularAcquisitionBufferCapacity(capacity); + if (ret != DEVICE_OK) throw CMMError(getDeviceErrorText(ret, pDataStreamer).c_str(), MMERR_DEVICE_GENERIC); +} + +/** + * Returns circular acquisition buffer capacity + * @param label the data streamer device label + */ +long CMMCore::getCircularAcquisitionBufferCapacity(const char* dataStreamerLabel) +{ + std::shared_ptr pDataStreamer = deviceManager_->GetDeviceOfType(dataStreamerLabel); + LOG_DEBUG(coreLogger_) << "Getting circular acquisition buffer capacity of " << dataStreamerLabel; + mm::DeviceModuleLockGuard guard(pDataStreamer); + int ret = pDataStreamer->GetCircularAcquisitionBufferCapacity(); + return ret; +} + +/** +* Get error code message +*/ +std::string CMMCore::getErrorMessage(const char* deviceLabel, int code) throw (CMMError) +{ + std::shared_ptr pDevice = deviceManager_->GetDevice(deviceLabel); + mm::DeviceModuleLockGuard guard(pDevice); + if (code == DEVICE_OK) { + return string("Device ok."); + } + else { + return pDevice->GetErrorText(code); + } +} + /** * Saves the current system state to a text file of the MM specific format. * The file records only read-write properties. diff --git a/MMCore/MMCore.h b/MMCore/MMCore.h index 71addcb64..3447ae975 100644 --- a/MMCore/MMCore.h +++ b/MMCore/MMCore.h @@ -609,6 +609,30 @@ class CMMCore std::string getGalvoChannel(const char* galvoLabel) throw (CMMError); ///@} + /** \name DataStreamer control. + * + * Control of data streaming devices. + */ + ///@{ + void startStream(const char* dataStreamerLabel) throw (CMMError); + void stopStream(const char* dataStreamerLabel) throw (CMMError); + bool getOverflowStatus(const char* dataStreamerLabel) throw (CMMError); + void resetOverflowStatus(const char* dataStreamerLabel) throw (CMMError); + bool getAcquisitionPause(const char* dataStreamerLabel) throw (CMMError); + void setAcquisitionPause(const char* dataStreamerLabel, bool pause) throw (CMMError); + bool isStreaming(const char* dataStreamerLabel) throw (CMMError); + int getAcquisitionExitStatus(const char* dataStreamerLabel) throw (CMMError); + int getProcessingExitStatus(const char* dataStreamerLabel) throw (CMMError); + void setStreamParameters(const char* dataStreamerLabel, bool stopOnOverflow, bool pauseAcquisitionBeforeOverflow, + int numberOfBlocks, int durationMs, int updatePeriodMs) throw (CMMError); + void getStreamParameters(const char* dataStreamerLabel, bool& stopOnOverflow, bool& pauseAcquisitionBeforeOverflow, + int& numberOfBlocks, int& durationMs, int& updatePeriodMs) throw (CMMError); + void setCircularAcquisitionBufferCapacity(const char* dataStreamerLabel, int capacity) throw (CMMError); + long getCircularAcquisitionBufferCapacity(const char* dataStreamerLabel) throw (CMMError); + + std::string getErrorMessage(const char* deviceLabel, int code) throw (CMMError); + ///@} + /** \name Device discovery. */ ///@{ bool supportsDeviceDetection(char* deviceLabel); diff --git a/MMCore/MMCore.vcxproj b/MMCore/MMCore.vcxproj index 0b2428216..c5369b973 100644 --- a/MMCore/MMCore.vcxproj +++ b/MMCore/MMCore.vcxproj @@ -86,6 +86,7 @@ + @@ -126,6 +127,7 @@ + diff --git a/MMCore/MMCore.vcxproj.filters b/MMCore/MMCore.vcxproj.filters index a01ede1fd..65f1987a8 100644 --- a/MMCore/MMCore.vcxproj.filters +++ b/MMCore/MMCore.vcxproj.filters @@ -141,6 +141,9 @@ Source Files + + Source Files\Devices + @@ -305,5 +308,8 @@ Header Files + + Header Files\Devices + - + \ No newline at end of file diff --git a/MMCoreJ_wrap/MMCoreJ.i b/MMCoreJ_wrap/MMCoreJ.i index 84a71459b..66521b138 100644 --- a/MMCoreJ_wrap/MMCoreJ.i +++ b/MMCoreJ_wrap/MMCoreJ.i @@ -287,6 +287,11 @@ %apply int &OUTPUT { int &y }; %apply int &OUTPUT { int &xSize }; %apply int &OUTPUT { int &ySize }; +%apply bool &OUTPUT { bool &stopOnOverflow }; +%apply bool &OUTPUT { bool &pauseBeforeOverflow }; +%apply int &OUTPUT { int &numberOfBlocks }; +%apply int &OUTPUT { int &durationMs }; +%apply int &OUTPUT { int &updatePeriodMs }; // Java typemap diff --git a/MMDevice/DeviceBase.h b/MMDevice/DeviceBase.h index 4321f5945..7c266fda5 100644 --- a/MMDevice/DeviceBase.h +++ b/MMDevice/DeviceBase.h @@ -34,6 +34,8 @@ #include "ModuleInterface.h" #include "DeviceThreads.h" +#include + #include #include @@ -80,6 +82,11 @@ const char* const g_Msg_DEVICE_DUPLICATE_LIBRARY="Duplicate Device Library Name" const char* const g_Msg_DEVICE_PROPERTY_NOT_SEQUENCEABLE="This property is not sequenceable"; const char* const g_Msg_DEVICE_SEQUENCE_TOO_LARGE="Sequence is too large for this device"; const char* const g_Msg_DEVICE_NOT_YET_IMPLEMENTED="This command has not yet been implemented for this device."; +const char* const g_Msg_DEVICE_DATASTREAMER_BUSY_ACQUIRING = "Data streamer is running"; +const char* const g_Msg_DEVICE_DATASTREAMER_STOPPED_ON_USER_SELECTION = "Stopped on user request"; +const char* const g_Msg_DEVICE_DATASTREAMER_STOPPED_ON_OVERFLOW = "Stopped on circular acquisition buffer overflow"; +const char* const g_Msg_DEVICE_DATASTREAMER_STOPPED_ON_NBLOCKS_COLLECTED = "Stopped on collecting specified number of blocks"; +const char* const g_Msg_DEVICE_DATASTREAMER_STOPPED_ON_TIME_ELAPSED = "Stopped on DataStreamer time elapsed"; inline long nint( double value ) { @@ -947,6 +954,11 @@ class CDeviceBase : public T SetErrorText(DEVICE_PROPERTY_NOT_SEQUENCEABLE, g_Msg_DEVICE_PROPERTY_NOT_SEQUENCEABLE); SetErrorText(DEVICE_SEQUENCE_TOO_LARGE, g_Msg_DEVICE_SEQUENCE_TOO_LARGE); SetErrorText(DEVICE_NOT_YET_IMPLEMENTED, g_Msg_DEVICE_NOT_YET_IMPLEMENTED); + SetErrorText(DEVICE_DATASTREAMER_BUSY_ACQUIRING, g_Msg_DEVICE_DATASTREAMER_BUSY_ACQUIRING); + SetErrorText(DEVICE_DATASTREAMER_STOPPED_ON_USER_SELECTION, g_Msg_DEVICE_DATASTREAMER_STOPPED_ON_USER_SELECTION); + SetErrorText(DEVICE_DATASTREAMER_STOPPED_ON_OVERFLOW, g_Msg_DEVICE_DATASTREAMER_STOPPED_ON_OVERFLOW); + SetErrorText(DEVICE_DATASTREAMER_STOPPED_ON_NBLOCKS_COLLECTED, g_Msg_DEVICE_DATASTREAMER_STOPPED_ON_NBLOCKS_COLLECTED); + SetErrorText(DEVICE_DATASTREAMER_STOPPED_ON_TIME_ELAPSED, g_Msg_DEVICE_DATASTREAMER_STOPPED_ON_TIME_ELAPSED); } /** @@ -2161,6 +2173,494 @@ class CGalvoBase : public CDeviceBase double GetYMinimum() { return 0.0;}; }; +typedef struct { + std::unique_ptr data; + unsigned int expectedSize; + unsigned int actualSize; +} acquired_block; + +/** +* Helper class for storing data blocks acquired by DataStreamer devices +*/ +class AcquiredBlockCollection { +public: + AcquiredBlockCollection(unsigned int maxNumberOfBlocks, bool stopOnOverflow) : + overflowStatus_(false) + { + maxNumberOfBlocks_ = maxNumberOfBlocks; + cb_.set_capacity(maxNumberOfBlocks_); + stopOnOverflow_ = stopOnOverflow; + } + + ~AcquiredBlockCollection() {} + + void Add(std::unique_ptr &pDataBlock) + { + MMThreadGuard g(this->executeLock_); + if ((cb_.capacity() - cb_.size()) == 0) { + overflowStatus_ = true; + if (stopOnOverflow_) return; + } + cb_.push_back(std::move(pDataBlock)); + } + + int GetCapacity() { + MMThreadGuard g(this->executeLock_); + return (int)cb_.capacity(); + } + + int GetSize() { + MMThreadGuard g(this->executeLock_); + return (int)cb_.size(); + } + + std::unique_ptr Remove() + { + MMThreadGuard g(this->executeLock_); + if (cb_.size() == 0) { + return 0; + } + else { + std::unique_ptr dataBlock = std::move(cb_.at(0)); + cb_.pop_front(); + return dataBlock; + } + } + + void SetOverflowStatus(bool status) + { + MMThreadGuard g(this->executeLock_); + overflowStatus_ = status; + } + + bool GetOverflowStatus() + { + MMThreadGuard g(this->executeLock_); + return overflowStatus_; + } +private: + boost::circular_buffer> cb_; + MMThreadLock executeLock_; + unsigned long maxNumberOfBlocks_; + bool stopOnOverflow_; + bool overflowStatus_; +}; + + +/** +* Base class for creating DataStreamer devices. +*/ +template +class CDataStreamerBase : public CDeviceBase +{ + friend class AcquiredBlockCollection; +public: + CDataStreamerBase() : numberOfBlocks_(1), durationMs_(1000), updatePeriodMs_(100), + stopOnOverflow_(true), stopFlag_(false), acqCollectionCapacity_(10) + { + acqCollection_ = new AcquiredBlockCollection(acqCollectionCapacity_, stopOnOverflow_); + thdAcq_ = new AcquisitionThread(this); + thdProc_ = new ProcessingThread(this); + } + + virtual ~CDataStreamerBase() + { + if (thdAcq_->IsRunning()) { + thdAcq_->Stop(); + thdAcq_->wait(); + } + if (thdProc_->IsRunning()) { + thdProc_->wait(); + } + delete thdAcq_; + delete thdProc_; + delete acqCollection_; + } + + virtual int StartStream() = 0; + + int StartDataStreamerThreads() + { + if (IsStreaming()) return DEVICE_DATASTREAMER_BUSY_ACQUIRING; + delete acqCollection_; + acqCollection_ = new AcquiredBlockCollection(acqCollectionCapacity_, stopOnOverflow_); + thdAcq_->Start(); + thdProc_->Start(); + return DEVICE_OK; + } + + virtual int StopStream() = 0; + + int StopDataStreamerThreads() { + // a non-blocking implementaion + if (thdAcq_->IsRunning()) { + thdAcq_->Stop(); + //thdAcq_->wait(); + } + if (thdProc_->IsRunning()) { + //thdProc_->wait(); + } + return DEVICE_OK; + } + + virtual int GetBufferSize(unsigned& dataBufferSiize) = 0; + virtual std::unique_ptr GetBuffer(unsigned expectedDataBufferSize, unsigned& actualDataBufferSize, int& exitCode) = 0; + virtual int ProcessBuffer(std::unique_ptr& pDataBuffer, unsigned actualDataBufferSize) = 0; + + virtual int SetStreamParameters(bool stopOnOverflow, bool pauseAcquisitionBeforeOverflow, int numberOfBlocks, int durationMs, int updatePeriodMs) + { + if (IsStreaming()) return DEVICE_DATASTREAMER_BUSY_ACQUIRING; + if (numberOfBlocks <= 0 || durationMs <= 0 || updatePeriodMs < 0) return DEVICE_INVALID_INPUT_PARAM; + stopOnOverflow_ = stopOnOverflow; + pauseAcquisitionBeforeOverflow_ = pauseAcquisitionBeforeOverflow; + numberOfBlocks_ = numberOfBlocks; + durationMs_ = durationMs; + updatePeriodMs_ = updatePeriodMs; + return DEVICE_OK; + } + + virtual int GetStreamParameters(bool& stopOnOverflow, bool& pauseAcquisitionBeforeOverflow, int& numberOfBlocks, int& durationMs, int& updatePeriodMs) + { + stopOnOverflow = stopOnOverflow_; + pauseAcquisitionBeforeOverflow = pauseAcquisitionBeforeOverflow_; + numberOfBlocks = numberOfBlocks_; + durationMs = durationMs_; + updatePeriodMs = updatePeriodMs_; + return DEVICE_OK; + } + + virtual int SetAcquisitionPause(bool pause) { + if (!thdAcq_->IsRunning()) return DEVICE_OK; + thdAcq_->SetPause(pause); + return DEVICE_OK; + } + + virtual bool GetAcquisitionPause() { + return thdAcq_->GetPause(); + } + + virtual bool GetOverflowStatus() { + return acqCollection_->GetOverflowStatus(); + } + + virtual int ResetOverflowStatus() { + acqCollection_->SetOverflowStatus(false); + return DEVICE_OK; + } + + virtual bool IsStreaming() { + if (thdAcq_->IsRunning() || thdProc_->IsRunning()) { + return true; + } + else { + return false; + } + } + + virtual int GetAcquisitionExitStatus() { + return thdAcq_->GetExitStatus(); + } + + virtual int GetProcessingExitStatus() { + return thdProc_->GetExitStatus(); + } + + virtual int SetCircularAcquisitionBufferCapacity(int capacity) { + if (IsStreaming()) return DEVICE_DATASTREAMER_BUSY_ACQUIRING; + if (capacity <= 0) return DEVICE_INVALID_INPUT_PARAM; + acqCollectionCapacity_ = capacity; + delete acqCollection_; + acqCollection_ = new AcquiredBlockCollection(acqCollectionCapacity_, stopOnOverflow_); + return DEVICE_OK; + } + + virtual int GetCircularAcquisitionBufferCapacity() { + return acqCollectionCapacity_; + } + + class AcquisitionThread : public MMDeviceThreadBase + { + friend class CDataStreamerBase; + friend class AcquiredBlockCollection; + public: + AcquisitionThread(CDataStreamerBase* p) : isRunning_(false),stopFlag_(false),pause_(false) + { + pDataStreamerBase_ = p; + } + ~AcquisitionThread() {} + + void Start() { + this->SetIsRunning(true); + this->activate(); + } + + void Stop() { + if (this->GetIsRunning()) { + this->SetStopFlag(true); + } + } + + bool IsRunning() { + return this->GetIsRunning(); + } + + private: + CDataStreamerBase* pDataStreamerBase_; + bool isRunning_; + bool stopFlag_; + bool pause_; + int exitStatus_; + MMThreadLock stopLock_; + MMThreadLock isRunningLock_; + MMThreadLock pauseLock_; + MMThreadLock exitStatusLock_; + + // Provide description here + int svc(void) throw () + { + this->SetExitStatus(DEVICE_DATASTREAMER_BUSY_ACQUIRING); + int retGetBufferSize = DEVICE_OK; + unsigned int expectedDataSize, actualDataSize; + int exitStatus = DEVICE_OK; + unsigned int blockCounter = 0; + AcquiredBlockCollection* acqCollection = pDataStreamerBase_->acqCollection_; + MM::MMTime startTime = pDataStreamerBase_->GetCurrentMMTime(); + MM::MMTime timeSinceStart = pDataStreamerBase_->GetCurrentMMTime() - startTime; + MM::MMTime lastCallTime = pDataStreamerBase_->GetCurrentMMTime(); + std::stringstream ss; + try { + // check all stopping conditions + while (!(this->GetStopFlag() || //read externally set stop flag + blockCounter >= pDataStreamerBase_->numberOfBlocks_ || // stop if desired number of blocks have been collected + (acqCollection->GetOverflowStatus() && pDataStreamerBase_->stopOnOverflow_) || // buffer overflow + timeSinceStart > MM::MMTime(pDataStreamerBase_->durationMs_*1000))) { // collection time elapsed + + if (pDataStreamerBase_->pauseAcquisitionBeforeOverflow_) { + // request a pause before overflow happens + if (acqCollection->GetSize() == acqCollection->GetCapacity()) { + this->SetPause(true); + } + else { + this->SetPause(false); + } + } + // pause if requested + if (this->GetPause()) { + Sleep((int)(pDataStreamerBase_->GetCurrentMMTime().getMsec() - lastCallTime.getMsec())); + continue; + } + // check for new data only if updatePeriodMs has elapsed + if (pDataStreamerBase_->updatePeriodMs_ > pDataStreamerBase_->GetCurrentMMTime().getMsec() - lastCallTime.getMsec()) { + Sleep((int)(pDataStreamerBase_->GetCurrentMMTime().getMsec() - lastCallTime.getMsec())); + continue; + } + lastCallTime = pDataStreamerBase_->GetCurrentMMTime(); + // check for new data + retGetBufferSize = pDataStreamerBase_->GetBufferSize(expectedDataSize); + if (retGetBufferSize != DEVICE_OK) break; + if (expectedDataSize != 0) { + std::unique_ptr newBlock(new acquired_block); + newBlock->data = pDataStreamerBase_->GetBuffer(expectedDataSize, actualDataSize, exitStatus); + if (newBlock->data != 0) { + newBlock->expectedSize = expectedDataSize; + newBlock->actualSize = actualDataSize; + acqCollection->Add(newBlock); + blockCounter++; + } + if (exitStatus != DEVICE_OK) break; + } + timeSinceStart = pDataStreamerBase_->GetCurrentMMTime() - startTime; + + } + } + catch (...) { + pDataStreamerBase_->LogMessage("Unknown acquisition thread exception.", true); + this->SetIsRunning(false); + this->SetStopFlag(false); + return DEVICE_ERR; + } + + ss << "Terminating acuisition thread for the following reason: "; + if (this->GetStopFlag()) { + ss << " user selection"; + this->SetExitStatus(DEVICE_DATASTREAMER_STOPPED_ON_USER_SELECTION); + } + else if (acqCollection->GetOverflowStatus() && pDataStreamerBase_->stopOnOverflow_) { + ss << "acquisition buffer overflow"; + this->SetExitStatus(DEVICE_DATASTREAMER_STOPPED_ON_OVERFLOW); + } + else if (blockCounter >= pDataStreamerBase_->numberOfBlocks_) { + ss << "desired number of blocks (" << blockCounter << ") have been collected"; + this->SetExitStatus(DEVICE_DATASTREAMER_STOPPED_ON_NBLOCKS_COLLECTED); + } + else if (timeSinceStart > MM::MMTime(pDataStreamerBase_->durationMs_*1000)) { + ss << "acquisition time exceeded the set limit (" << pDataStreamerBase_->durationMs_ << " milliseconds)"; + this->SetExitStatus(DEVICE_DATASTREAMER_STOPPED_ON_TIME_ELAPSED); + } + else if (retGetBufferSize != DEVICE_OK) { + ss << "GetBufferSize call returned code " << retGetBufferSize; + this->SetExitStatus(retGetBufferSize); + } + else if (exitStatus != DEVICE_OK) { + ss << "GetBuffer call reported exit status " << exitStatus; + this->SetExitStatus(exitStatus); + } + else { + ss << "unknown"; + } + pDataStreamerBase_->LogMessage(ss.str(), true); + + this->SetIsRunning(false); + this->SetStopFlag(false); + return this->GetExitStatus(); + } + + void SetStopFlag(bool stop) { + MMThreadGuard g(this->stopLock_); + stopFlag_ = stop; + } + + bool GetStopFlag() { + MMThreadGuard g(this->stopLock_); + return stopFlag_; + } + + void SetIsRunning(bool isRunning) { + MMThreadGuard g(this->isRunningLock_); + isRunning_ = isRunning; + } + + bool GetIsRunning() { + MMThreadGuard g(this->isRunningLock_); + return isRunning_; + } + + void SetPause(bool pause) { + MMThreadGuard g(this->pauseLock_); + pause_ = pause; + } + + bool GetPause() { + MMThreadGuard g(this->pauseLock_); + return pause_; + } + + void SetExitStatus(int exitStatus) { + MMThreadGuard g(this->exitStatusLock_); + exitStatus_ = exitStatus; + } + + int GetExitStatus() { + MMThreadGuard g(this->exitStatusLock_); + return exitStatus_; + } + + }; + + class ProcessingThread : public MMDeviceThreadBase + { + friend class CDataStreamerBase; + friend class AcquiredBlockCollection; + friend class AcquisitionThread; + public: + ProcessingThread(CDataStreamerBase* p) : isRunning_(false) + { + pDataStreamerBase_ = p; + } + ~ProcessingThread() {} + + void Start() { + this->SetIsRunning(true); + this->activate(); + } + + bool IsRunning() { + return this->GetIsRunning(); + } + + private: + CDataStreamerBase* pDataStreamerBase_; + bool isRunning_; + int exitStatus_; + MMThreadLock isRunningLock_; + MMThreadLock exitStatusLock_; + + // Provide description here + int svc(void) throw () + { + this->SetExitStatus(DEVICE_DATASTREAMER_BUSY_ACQUIRING); + int ret = DEVICE_OK; + AcquisitionThread* acqThr = pDataStreamerBase_->thdAcq_; + AcquiredBlockCollection* acqCollection = pDataStreamerBase_->acqCollection_; + std::stringstream ss; + try { + // give the device time to acquire data + Sleep(pDataStreamerBase_->updatePeriodMs_); + // run while the acquisition thread is active or + // there is unprocessed data in the circular acquisition buffer + while (acqThr->GetIsRunning() || acqCollection->GetSize() != 0) { + if (acqCollection->GetSize() != 0) { + std::unique_ptr newBlock = acqCollection->Remove(); + ret = pDataStreamerBase_->ProcessBuffer(std::move(newBlock->data), newBlock->actualSize); + if (ret != DEVICE_OK) break; + } + else { + Sleep(pDataStreamerBase_->updatePeriodMs_); + } + } + } + catch (...) { + pDataStreamerBase_->LogMessage("Unknown processing thread exception.", true); + this->SetIsRunning(false); + this->SetExitStatus(DEVICE_ERR); + return DEVICE_ERR; + } + + if (ret != DEVICE_OK) { + ss << "ProcessBuffer call returned code " << ret; + pDataStreamerBase_->LogMessage(ss.str(), true); + } + + this->SetExitStatus(ret); + pDataStreamerBase_->LogMessage("Terminating processing thread.", true); + this->SetIsRunning(false); + return ret; + } + + void SetIsRunning(bool isRunning) { + MMThreadGuard g(this->isRunningLock_); + isRunning_ = isRunning; + } + + bool GetIsRunning() { + MMThreadGuard g(this->isRunningLock_); + return isRunning_; + } + + void SetExitStatus(int exitStatus) { + MMThreadGuard g(this->exitStatusLock_); + exitStatus_ = exitStatus; + } + + int GetExitStatus() { + MMThreadGuard g(this->exitStatusLock_); + return exitStatus_; + } + }; + +private: + unsigned numberOfBlocks_; + int durationMs_; + int updatePeriodMs_; + bool stopOnOverflow_; + bool stopFlag_; + bool pauseAcquisitionBeforeOverflow_; + AcquiredBlockCollection* acqCollection_; + unsigned acqCollectionCapacity_; + AcquisitionThread* thdAcq_; + ProcessingThread* thdProc_; +}; + /** * Base class for creating special HUB devices for managing device libraries. */ diff --git a/MMDevice/MMDevice.cpp b/MMDevice/MMDevice.cpp index 36c82ef97..4423dae0e 100644 --- a/MMDevice/MMDevice.cpp +++ b/MMDevice/MMDevice.cpp @@ -46,6 +46,7 @@ const DeviceType SignalIO::Type = SignalIODevice; const DeviceType Magnifier::Type = MagnifierDevice; const DeviceType SLM::Type = SLMDevice; const DeviceType Galvo::Type = GalvoDevice; +const DeviceType DataStreamer::Type = DataStreamerDevice; const DeviceType Hub::Type = HubDevice; } // namespace MM diff --git a/MMDevice/MMDevice.h b/MMDevice/MMDevice.h index cd070d6ff..c1862f634 100644 --- a/MMDevice/MMDevice.h +++ b/MMDevice/MMDevice.h @@ -1176,6 +1176,61 @@ namespace MM { virtual int GetChannel(char* channelName) = 0; }; + /** + * DataStreamer API + */ + class DataStreamer : public Device + { + public: + DataStreamer() {} + virtual ~DataStreamer() {} + + // Device API + virtual DeviceType GetType() const { return Type; } + static const DeviceType Type; + + // DataStreamer API + + // Calls that are specific to each DataStreamer device + // and should be implemented in the device adapter + /** + * To implement, tell the hardware to start streaming data, then call StartDataStreamerThreads + */ + virtual int StartStream() = 0; + /** + * To implement, tell the hardware to stop streaming data, then call StopDataStreamerThreads + */ + virtual int StopStream() = 0; + /** + * Get the expected size (in bytes) of the data buffer available for download from the hardware + */ + virtual int GetBufferSize(unsigned& dataBufferSiize) = 0; + /** + * Receive data buffer from the hardware and place it at memory location + * specified by pDataBuffer; + * update the size of the buffer and pass it as actualDataBufferSize + */ + virtual std::unique_ptr GetBuffer(unsigned expectedDataBufferSize, unsigned& actualDataBufferSize, int& exitCode) = 0; + /** + * Process the data buffer available at pDataBuffer + */ + virtual int ProcessBuffer(std::unique_ptr& pDataBuffer, unsigned actualDataBufferSize) = 0; + + // Calls that are implemented at the DeviceBase level and + // remain the same for any DataStreamer device + virtual int SetStreamParameters(bool stopOnOverflow, bool pauseAcquisitionBeforeOverflow, int numberOfBuffers, int durationMs, int updatePeriodMs) = 0; + virtual int GetStreamParameters(bool& stopOnOverflow, bool& pauseAcquisitionBeforeOverflow, int& numberOfBuffers, int& durationMs, int& updatePeriodMs) = 0; + virtual int SetAcquisitionPause(bool pause) = 0; + virtual bool GetAcquisitionPause() = 0; + virtual bool GetOverflowStatus() = 0; + virtual int ResetOverflowStatus() = 0; + virtual bool IsStreaming() = 0; + virtual int GetAcquisitionExitStatus() = 0; + virtual int GetProcessingExitStatus() = 0; + virtual int SetCircularAcquisitionBufferCapacity(int capacity) = 0; + virtual int GetCircularAcquisitionBufferCapacity() = 0; + }; + /** * HUB device. Used for complex uber-device functionality in microscope stands * and managing auto-configuration (discovery) of other devices diff --git a/MMDevice/MMDeviceConstants.h b/MMDevice/MMDeviceConstants.h index 058572b91..2043ea4db 100644 --- a/MMDevice/MMDeviceConstants.h +++ b/MMDevice/MMDeviceConstants.h @@ -78,7 +78,11 @@ #define DEVICE_SEQUENCE_TOO_LARGE 39 #define DEVICE_OUT_OF_MEMORY 40 #define DEVICE_NOT_YET_IMPLEMENTED 41 - +#define DEVICE_DATASTREAMER_BUSY_ACQUIRING 42 +#define DEVICE_DATASTREAMER_STOPPED_ON_USER_SELECTION 43 +#define DEVICE_DATASTREAMER_STOPPED_ON_OVERFLOW 44 +#define DEVICE_DATASTREAMER_STOPPED_ON_NBLOCKS_COLLECTED 45 +#define DEVICE_DATASTREAMER_STOPPED_ON_TIME_ELAPSED 46 namespace MM { const int MaxStrLength = 1024; @@ -216,7 +220,8 @@ namespace MM { MagnifierDevice, SLMDevice, HubDevice, - GalvoDevice + GalvoDevice, + DataStreamerDevice }; enum PropertyType {