Skip to content

Commit

Permalink
add rawdata stream in original sfreq units
Browse files Browse the repository at this point in the history
  • Loading branch information
timonmerk committed Nov 25, 2024
1 parent b326e2a commit 8b90328
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 28 deletions.
33 changes: 20 additions & 13 deletions gui_dev/src/components/RawDataGraph.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ export const RawDataGraph = ({
xAxisTitle = "Nr. of Samples",
yAxisTitle = "Value",
}) => {
const graphData = useSocketStore((state) => state.graphData);
//const graphData = useSocketStore((state) => state.graphData);
const graphRawData = useSocketStore((state) => state.graphRawData);

const channels = useSessionStore((state) => state.channels, shallow);

Expand All @@ -51,7 +52,7 @@ export const RawDataGraph = ({
const graphRef = useRef(null);
const plotlyRef = useRef(null);
const [yAxisMaxValue, setYAxisMaxValue] = useState("Auto");
const [maxDataPoints, setMaxDataPoints] = useState(400);
const [maxDataPoints, setMaxDataPoints] = useState(10000);

const layoutRef = useRef({
// title: {
Expand Down Expand Up @@ -117,28 +118,34 @@ export const RawDataGraph = ({

// Process incoming graphData to extract raw data for each channel -> TODO: Check later if this fits here better than socketStore
useEffect(() => {
if (!graphData || Object.keys(graphData).length === 0) return;
// if (!graphData || Object.keys(graphData).length === 0) return;
if (!graphRawData || Object.keys(graphRawData).length === 0) return;

const latestData = graphData;
//const latestData = graphData;
const latestData = graphRawData;

setRawData((prevRawData) => {
const updatedRawData = { ...prevRawData };

Object.entries(latestData).forEach(([key, value]) => {
const { channelName = "", featureName = "" } = getChannelAndFeature(
availableChannels,
key
);
//const { channelName = "", featureName = "" } = getChannelAndFeature(
// availableChannels,
// key
//);

if (!channelName) return;
//if (!channelName) return;

if (featureName !== "raw") return;
//if (featureName !== "raw") return;

const channelName = key;

if (!selectedChannels.includes(key)) return;

if (!updatedRawData[channelName]) {
updatedRawData[channelName] = [];
}

updatedRawData[channelName].push(value);
updatedRawData[channelName].push(...value);

if (updatedRawData[channelName].length > maxDataPoints) {
updatedRawData[channelName] = updatedRawData[channelName].slice(
Expand All @@ -149,7 +156,7 @@ export const RawDataGraph = ({

return updatedRawData;
});
}, [graphData, availableChannels, maxDataPoints]);
}, [graphRawData, availableChannels, maxDataPoints]);

useEffect(() => {
if (!graphRef.current) return;
Expand Down Expand Up @@ -301,7 +308,7 @@ export const RawDataGraph = ({
step={50}
marks
min={0}
max={500}
max={10000}
/>
</CollapsibleBox>
</Box>
Expand Down
2 changes: 1 addition & 1 deletion gui_dev/src/stores/sessionStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export const useSessionStore = createStore("session", (set, get) => ({
lineNoise: 50,
samplingRateFeatures: 11,
allValid: false,
experimentName: "subject",
experimentName: "sub",
outputDirectory: "default",
},

Expand Down
9 changes: 8 additions & 1 deletion gui_dev/src/stores/socketStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export const useSocketStore = createStore("socket", (set, get) => ({
status: "disconnected", // 'disconnected', 'connecting', 'connected'
error: null,
graphData: [],
graphRawData : [],
infoMessages: [],
reconnectTimer: null,
intentionalDisconnect: false,
Expand Down Expand Up @@ -68,7 +69,13 @@ export const useSocketStore = createStore("socket", (set, get) => ({
const arrayBuffer = event.data;
const decodedData = CBOR.decode(arrayBuffer);
// console.log("Decoded message from server:", decodedData);
set({graphData: decodedData});
if (Object.keys(decodedData)[0] == "raw_data") {
set({graphRawData: decodedData.raw_data});
} else {
set({graphData: decodedData});
}


} catch (error) {
console.error("Failed to decode CBOR message:", error);
}
Expand Down
4 changes: 1 addition & 3 deletions py_neuromodulation/gui/backend/app_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ async def handle_stream_control(data: dict):
self.logger.info("Starting stream")

self.pynm_state.start_run_function(
# out_dir=data["out_dir"],
# experiment_name=data["experiment_name"],
websocket_manager_features=self.websocket_manager,
websocket_manager=self.websocket_manager,
)


Expand Down
25 changes: 15 additions & 10 deletions py_neuromodulation/gui/backend/app_pynm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,25 @@
from py_neuromodulation import logger

async def run_stream_controller(feature_queue: queue.Queue, rawdata_queue: queue.Queue,
websocket_manager_features: "WebSocketManager", stop_event: threading.Event):
websocket_manager: "WebSocketManager", stop_event: threading.Event):
while not stop_event.wait(0.002):
if not feature_queue.empty() and websocket_manager_features is not None:
if not feature_queue.empty() and websocket_manager is not None:
feature_dict = feature_queue.get()
logger.info("Sending message to Websocket")
await websocket_manager_features.send_cbor(feature_dict)
# here the rawdata queue could also be used to send raw data, potentiall through different websocket?
await websocket_manager.send_cbor(feature_dict)

if not rawdata_queue.empty() and websocket_manager is not None:
raw_data = rawdata_queue.get()

await websocket_manager.send_cbor(raw_data)

def run_stream_controller_sync(feature_queue: queue.Queue,
rawdata_queue: queue.Queue,
websocket_manager_features: "WebSocketManager",
websocket_manager: "WebSocketManager",
stop_event: threading.Event
):
# The run_stream_controller needs to be started as an asyncio function due to the async websocket
asyncio.run(run_stream_controller(feature_queue, rawdata_queue, websocket_manager_features, stop_event))
asyncio.run(run_stream_controller(feature_queue, rawdata_queue, websocket_manager, stop_event))

class PyNMState:
def __init__(
Expand All @@ -47,7 +51,7 @@ def __init__(

def start_run_function(
self,
websocket_manager_features=None,
websocket_manager=None,
) -> None:

self.stream.settings = self.settings
Expand All @@ -59,15 +63,16 @@ def start_run_function(
self.logger.info("Starting stream_controller_process thread")


# Stop even that is set in the app_backend
# Stop event
# .set() is called from app_backend
self.stop_event_ws = threading.Event()

self.stream_controller_thread = Thread(
target=run_stream_controller_sync,
daemon=True,
args=(self.feature_queue,
self.rawdata_queue,
websocket_manager_features,
websocket_manager,
self.stop_event_ws
),
)
Expand All @@ -89,7 +94,7 @@ def start_run_function(
"stream_lsl_name" : stream_lsl_name,
"feature_queue" : self.feature_queue,
"simulate_real_time" : True,
#"rawdata_queue" : self.rawdata_queue,
"rawdata_queue" : self.rawdata_queue,
},
)

Expand Down
11 changes: 11 additions & 0 deletions py_neuromodulation/stream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def run(
return_df: bool = True,
simulate_real_time: bool = False,
feature_queue: "queue.Queue | None" = None,
rawdata_queue: "queue.Queue | None" = None,
stream_handling_queue: "queue.Queue | None" = None,
):
self.is_stream_lsl = is_stream_lsl
Expand Down Expand Up @@ -314,6 +315,16 @@ def run(
file_writer.insert_data(feature_dict)
if feature_queue is not None:
feature_queue.put(feature_dict)
if rawdata_queue is not None:
# convert raw data into dict with new raw data in unit self.sfreq
new_time_ms = 1000 / self.settings.sampling_rate_features_hz
new_samples = int(new_time_ms * self.sfreq / 1000)
data_batch_dict = {}
data_batch_dict["raw_data"] = {}
for i, ch in enumerate(self.channels["name"]):
# needs to be list since cbor doesn't support np array
data_batch_dict["raw_data"][ch] = list(data_batch[i, -new_samples:])
rawdata_queue.put(data_batch_dict)

self.batch_count += 1
if self.batch_count % self.save_interval == 0:
Expand Down

0 comments on commit 8b90328

Please sign in to comment.