Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: support writing ulog to bytes handle #101

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pyulog/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import struct
import copy
import sys
import contextlib
import numpy as np
#pylint: disable=too-many-instance-attributes, unused-argument, missing-docstring
#pylint: disable=protected-access, too-many-branches
Expand Down Expand Up @@ -236,9 +237,14 @@ def get_dataset(self, name, multi_instance=0):
return [elem for elem in self._data_list
if elem.name == name and elem.multi_id == multi_instance][0]

def write_ulog(self, path):
def write_ulog(self, log_file):
""" write current data back into a ulog file """
with open(path, "wb") as ulog_file:
if isinstance(log_file, str):
handle = open(log_file, "wb")
else:
handle = contextlib.nullcontext(log_file)

with handle as ulog_file:
# Definition section
self._write_file_header(ulog_file)
self._write_flags(ulog_file)
Expand Down
30 changes: 30 additions & 0 deletions test/test_ulog.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import inspect
import unittest
import tempfile
from io import BytesIO

from ddt import ddt, data

Expand Down Expand Up @@ -53,4 +54,33 @@ def test_write_ulog(self, base_name):
else:
assert copied_value == original_value

@data('sample')
def test_write_ulog_memory(self, base_name):
'''
Test that the write_ulog method can write bytes to memory.
'''
ulog_file_name = os.path.join(TEST_PATH, base_name + '.ulg')
original = pyulog.ULog(ulog_file_name)
with BytesIO() as bytes_handle:
original.write_ulog(bytes_handle)
bytes_handle.seek(0)
copied = pyulog.ULog(bytes_handle)

for original_key, original_value in original.__dict__.items():
copied_value = getattr(copied, original_key)
if original_key == '_sync_seq_cnt':
# Sync messages are counted on parse, but otherwise dropped, so
# we don't rewrite them
assert copied_value == 0
elif original_key == '_appended_offsets':
# Abruptly ended messages just before offsets are dropped, so
# we don't rewrite appended offsets
assert copied_value == []
elif original_key == '_incompat_flags':
# Same reasoning on incompat_flags[0] as for '_appended_offsets'
assert copied_value[0] == original_value[0] & 0xFE # pylint: disable=unsubscriptable-object
assert copied_value[1:] == original_value[1:] # pylint: disable=unsubscriptable-object
else:
assert copied_value == original_value

# vim: set et fenc=utf-8 ft=python ff=unix sts=4 sw=4 ts=4
Loading