From 00c7356040b4922d1dec1547e3330396326c16f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Herman=20=C3=98ie=20Kolden?= Date: Sun, 30 Jun 2024 15:05:05 +0200 Subject: [PATCH] core: support writing ulog to bytes handle Previously only writing to files was possible. It's useful to be able to write to BytesIO for e.g. returning generated ULog files without having to write an intermediary file. --- pyulog/core.py | 10 ++++++++-- test/test_ulog.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/pyulog/core.py b/pyulog/core.py index 6069325..a712e88 100644 --- a/pyulog/core.py +++ b/pyulog/core.py @@ -3,6 +3,7 @@ import struct import copy import sys +import contextlib import numpy as np #pylint: disable=too-many-instance-attributes, unused-argument, missing-docstring #pylint: disable=protected-access, too-many-branches @@ -236,9 +237,14 @@ def get_dataset(self, name, multi_instance=0): return [elem for elem in self._data_list if elem.name == name and elem.multi_id == multi_instance][0] - def write_ulog(self, path): + def write_ulog(self, log_file): """ write current data back into a ulog file """ - with open(path, "wb") as ulog_file: + if isinstance(log_file, str): + handle = open(log_file, "wb") + else: + handle = contextlib.nullcontext(log_file) + + with handle as ulog_file: # Definition section self._write_file_header(ulog_file) self._write_flags(ulog_file) diff --git a/test/test_ulog.py b/test/test_ulog.py index 85f95c3..4c26909 100644 --- a/test/test_ulog.py +++ b/test/test_ulog.py @@ -6,6 +6,7 @@ import inspect import unittest import tempfile +from io import BytesIO from ddt import ddt, data @@ -53,4 +54,33 @@ def test_write_ulog(self, base_name): else: assert copied_value == original_value + @data('sample') + def test_write_ulog_memory(self, base_name): + ''' + Test that the write_ulog method can write bytes to memory. + ''' + ulog_file_name = os.path.join(TEST_PATH, base_name + '.ulg') + original = pyulog.ULog(ulog_file_name) + with BytesIO() as bytes_handle: + original.write_ulog(bytes_handle) + bytes_handle.seek(0) + copied = pyulog.ULog(bytes_handle) + + for original_key, original_value in original.__dict__.items(): + copied_value = getattr(copied, original_key) + if original_key == '_sync_seq_cnt': + # Sync messages are counted on parse, but otherwise dropped, so + # we don't rewrite them + assert copied_value == 0 + elif original_key == '_appended_offsets': + # Abruptly ended messages just before offsets are dropped, so + # we don't rewrite appended offsets + assert copied_value == [] + elif original_key == '_incompat_flags': + # Same reasoning on incompat_flags[0] as for '_appended_offsets' + assert copied_value[0] == original_value[0] & 0xFE # pylint: disable=unsubscriptable-object + assert copied_value[1:] == original_value[1:] # pylint: disable=unsubscriptable-object + else: + assert copied_value == original_value + # vim: set et fenc=utf-8 ft=python ff=unix sts=4 sw=4 ts=4