From 8b90328cd054226b457663fee2f14a76596920ef Mon Sep 17 00:00:00 2001 From: timonmerk Date: Mon, 25 Nov 2024 19:34:26 +0100 Subject: [PATCH] add rawdata stream in original sfreq units --- gui_dev/src/components/RawDataGraph.jsx | 33 +++++++++++-------- gui_dev/src/stores/sessionStore.js | 2 +- gui_dev/src/stores/socketStore.js | 9 ++++- py_neuromodulation/gui/backend/app_backend.py | 4 +-- py_neuromodulation/gui/backend/app_pynm.py | 25 ++++++++------ py_neuromodulation/stream/stream.py | 11 +++++++ 6 files changed, 56 insertions(+), 28 deletions(-) diff --git a/gui_dev/src/components/RawDataGraph.jsx b/gui_dev/src/components/RawDataGraph.jsx index 202c3da6..52e70f33 100644 --- a/gui_dev/src/components/RawDataGraph.jsx +++ b/gui_dev/src/components/RawDataGraph.jsx @@ -31,7 +31,8 @@ export const RawDataGraph = ({ xAxisTitle = "Nr. of Samples", yAxisTitle = "Value", }) => { - const graphData = useSocketStore((state) => state.graphData); + //const graphData = useSocketStore((state) => state.graphData); + const graphRawData = useSocketStore((state) => state.graphRawData); const channels = useSessionStore((state) => state.channels, shallow); @@ -51,7 +52,7 @@ export const RawDataGraph = ({ const graphRef = useRef(null); const plotlyRef = useRef(null); const [yAxisMaxValue, setYAxisMaxValue] = useState("Auto"); - const [maxDataPoints, setMaxDataPoints] = useState(400); + const [maxDataPoints, setMaxDataPoints] = useState(10000); const layoutRef = useRef({ // title: { @@ -117,28 +118,34 @@ export const RawDataGraph = ({ // Process incoming graphData to extract raw data for each channel -> TODO: Check later if this fits here better than socketStore useEffect(() => { - if (!graphData || Object.keys(graphData).length === 0) return; + // if (!graphData || Object.keys(graphData).length === 0) return; + if (!graphRawData || Object.keys(graphRawData).length === 0) return; - const latestData = graphData; + //const latestData = graphData; + const latestData = graphRawData; setRawData((prevRawData) => { const updatedRawData = { ...prevRawData }; Object.entries(latestData).forEach(([key, value]) => { - const { channelName = "", featureName = "" } = getChannelAndFeature( - availableChannels, - key - ); + //const { channelName = "", featureName = "" } = getChannelAndFeature( + // availableChannels, + // key + //); - if (!channelName) return; + //if (!channelName) return; - if (featureName !== "raw") return; + //if (featureName !== "raw") return; + + const channelName = key; + + if (!selectedChannels.includes(key)) return; if (!updatedRawData[channelName]) { updatedRawData[channelName] = []; } - updatedRawData[channelName].push(value); + updatedRawData[channelName].push(...value); if (updatedRawData[channelName].length > maxDataPoints) { updatedRawData[channelName] = updatedRawData[channelName].slice( @@ -149,7 +156,7 @@ export const RawDataGraph = ({ return updatedRawData; }); - }, [graphData, availableChannels, maxDataPoints]); + }, [graphRawData, availableChannels, maxDataPoints]); useEffect(() => { if (!graphRef.current) return; @@ -301,7 +308,7 @@ export const RawDataGraph = ({ step={50} marks min={0} - max={500} + max={10000} /> diff --git a/gui_dev/src/stores/sessionStore.js b/gui_dev/src/stores/sessionStore.js index f7039160..5e3b7d79 100644 --- a/gui_dev/src/stores/sessionStore.js +++ b/gui_dev/src/stores/sessionStore.js @@ -50,7 +50,7 @@ export const useSessionStore = createStore("session", (set, get) => ({ lineNoise: 50, samplingRateFeatures: 11, allValid: false, - experimentName: "subject", + experimentName: "sub", outputDirectory: "default", }, diff --git a/gui_dev/src/stores/socketStore.js b/gui_dev/src/stores/socketStore.js index 2fa0e2a9..c638ba07 100644 --- a/gui_dev/src/stores/socketStore.js +++ b/gui_dev/src/stores/socketStore.js @@ -10,6 +10,7 @@ export const useSocketStore = createStore("socket", (set, get) => ({ status: "disconnected", // 'disconnected', 'connecting', 'connected' error: null, graphData: [], + graphRawData : [], infoMessages: [], reconnectTimer: null, intentionalDisconnect: false, @@ -68,7 +69,13 @@ export const useSocketStore = createStore("socket", (set, get) => ({ const arrayBuffer = event.data; const decodedData = CBOR.decode(arrayBuffer); // console.log("Decoded message from server:", decodedData); - set({graphData: decodedData}); + if (Object.keys(decodedData)[0] == "raw_data") { + set({graphRawData: decodedData.raw_data}); + } else { + set({graphData: decodedData}); + } + + } catch (error) { console.error("Failed to decode CBOR message:", error); } diff --git a/py_neuromodulation/gui/backend/app_backend.py b/py_neuromodulation/gui/backend/app_backend.py index aadd4103..328bc34c 100644 --- a/py_neuromodulation/gui/backend/app_backend.py +++ b/py_neuromodulation/gui/backend/app_backend.py @@ -105,9 +105,7 @@ async def handle_stream_control(data: dict): self.logger.info("Starting stream") self.pynm_state.start_run_function( - # out_dir=data["out_dir"], - # experiment_name=data["experiment_name"], - websocket_manager_features=self.websocket_manager, + websocket_manager=self.websocket_manager, ) diff --git a/py_neuromodulation/gui/backend/app_pynm.py b/py_neuromodulation/gui/backend/app_pynm.py index e98233cb..fc12f34a 100644 --- a/py_neuromodulation/gui/backend/app_pynm.py +++ b/py_neuromodulation/gui/backend/app_pynm.py @@ -11,21 +11,25 @@ from py_neuromodulation import logger async def run_stream_controller(feature_queue: queue.Queue, rawdata_queue: queue.Queue, - websocket_manager_features: "WebSocketManager", stop_event: threading.Event): + websocket_manager: "WebSocketManager", stop_event: threading.Event): while not stop_event.wait(0.002): - if not feature_queue.empty() and websocket_manager_features is not None: + if not feature_queue.empty() and websocket_manager is not None: feature_dict = feature_queue.get() logger.info("Sending message to Websocket") - await websocket_manager_features.send_cbor(feature_dict) - # here the rawdata queue could also be used to send raw data, potentiall through different websocket? + await websocket_manager.send_cbor(feature_dict) + + if not rawdata_queue.empty() and websocket_manager is not None: + raw_data = rawdata_queue.get() + + await websocket_manager.send_cbor(raw_data) def run_stream_controller_sync(feature_queue: queue.Queue, rawdata_queue: queue.Queue, - websocket_manager_features: "WebSocketManager", + websocket_manager: "WebSocketManager", stop_event: threading.Event ): # The run_stream_controller needs to be started as an asyncio function due to the async websocket - asyncio.run(run_stream_controller(feature_queue, rawdata_queue, websocket_manager_features, stop_event)) + asyncio.run(run_stream_controller(feature_queue, rawdata_queue, websocket_manager, stop_event)) class PyNMState: def __init__( @@ -47,7 +51,7 @@ def __init__( def start_run_function( self, - websocket_manager_features=None, + websocket_manager=None, ) -> None: self.stream.settings = self.settings @@ -59,7 +63,8 @@ def start_run_function( self.logger.info("Starting stream_controller_process thread") - # Stop even that is set in the app_backend + # Stop event + # .set() is called from app_backend self.stop_event_ws = threading.Event() self.stream_controller_thread = Thread( @@ -67,7 +72,7 @@ def start_run_function( daemon=True, args=(self.feature_queue, self.rawdata_queue, - websocket_manager_features, + websocket_manager, self.stop_event_ws ), ) @@ -89,7 +94,7 @@ def start_run_function( "stream_lsl_name" : stream_lsl_name, "feature_queue" : self.feature_queue, "simulate_real_time" : True, - #"rawdata_queue" : self.rawdata_queue, + "rawdata_queue" : self.rawdata_queue, }, ) diff --git a/py_neuromodulation/stream/stream.py b/py_neuromodulation/stream/stream.py index 21bcc471..8fdb9aec 100644 --- a/py_neuromodulation/stream/stream.py +++ b/py_neuromodulation/stream/stream.py @@ -208,6 +208,7 @@ def run( return_df: bool = True, simulate_real_time: bool = False, feature_queue: "queue.Queue | None" = None, + rawdata_queue: "queue.Queue | None" = None, stream_handling_queue: "queue.Queue | None" = None, ): self.is_stream_lsl = is_stream_lsl @@ -314,6 +315,16 @@ def run( file_writer.insert_data(feature_dict) if feature_queue is not None: feature_queue.put(feature_dict) + if rawdata_queue is not None: + # convert raw data into dict with new raw data in unit self.sfreq + new_time_ms = 1000 / self.settings.sampling_rate_features_hz + new_samples = int(new_time_ms * self.sfreq / 1000) + data_batch_dict = {} + data_batch_dict["raw_data"] = {} + for i, ch in enumerate(self.channels["name"]): + # needs to be list since cbor doesn't support np array + data_batch_dict["raw_data"][ch] = list(data_batch[i, -new_samples:]) + rawdata_queue.put(data_batch_dict) self.batch_count += 1 if self.batch_count % self.save_interval == 0: