Skip to content

Commit

Permalink
add support for non-bzImage kernels
Browse files Browse the repository at this point in the history
Signed-off-by: Jannik Glückert <[email protected]>
Closes: projg2#10
Signed-off-by: Michał Górny <[email protected]>
  • Loading branch information
Jannik2099 authored and mgorny committed Oct 26, 2020
1 parent cf7054a commit 93f109e
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 15 deletions.
78 changes: 67 additions & 11 deletions ecleankernel/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import enum
import errno
import importlib
import os
import shutil
import struct
Expand All @@ -27,6 +28,10 @@ class UnrecognizedKernelError(Exception):
pass


class MissingDecompressorError(Exception):
pass


class GenericFile(object):
"""A generic file associated with a kernel"""

Expand Down Expand Up @@ -91,6 +96,38 @@ def __init__(self,
super().__init__(path, KernelFileType.KERNEL)
self.internal_version = self.read_internal_version()

def decompress_raw(self) -> bytes:
f = open(self.path, 'rb')
magic_dict = {
b'\x1f\x8b\x08': 'gzip',
b'\x42\x5a\x68': 'bz2',
b'\xfd\x37\x7a\x58\x5a\x00': 'lzma',
b'\x5d\x00\x00': 'lzma',
b'\x04\x22\x4d\x18': 'lz4.frame',
b'\x28\xb5\x2f\xfd': 'zstandard',
b'\x89\x4c\x5a\x4f\x00\x0d\x0a\x1a\x0a': 'lzo',
}
maxlen = max(len(x) for x in magic_dict)
header = f.read(maxlen)
f.seek(0)
for magic, comp in magic_dict.items():
if header.startswith(magic):
try:
mod = importlib.import_module(comp)
except ModuleNotFoundError:
raise MissingDecompressorError(
f'Kernel file {self.path} is compressed with '
f'{comp}, but the required decompressor '
f'is not installed')
if comp == 'zstandard':
# Technically a redundant import, this is just
# to make your IDE happy :)
import zstandard
return zstandard.ZstdDecompressor().decompress(f.read())
else:
return getattr(mod, 'decompress')(f.read())
return f.read()

def read_internal_version(self) -> str:
"""Read version from the kernel file"""
f = open(self.path, 'rb')
Expand All @@ -101,18 +138,37 @@ def read_internal_version(self) -> str:
raise UnrecognizedKernelError(
f'Kernel file {self.path} terminates before bzImage '
f'header')
if buf[2:6] != b'HdrS':
raise UnrecognizedKernelError(
f'Unmatched magic for kernel file {self.path} '
f'({repr(buf[2:6])} != b"HdrS")')
offset = struct.unpack_from('H', buf, 0x0e)[0]
f.seek(offset - 0x10, 1)
buf = f.read(0x100) # XXX
if not buf:
if buf[2:6] == b'HdrS':
offset = struct.unpack_from('H', buf, 0x0e)[0]
f.seek(offset - 0x10, 1)
buf = f.read(0x100) # XXX
if not buf:
raise UnrecognizedKernelError(
f'Kernel file {self.path} terminates before expected '
f'version string position ({offset + 0x200})')
ret = buf.split(b' ', 1)
else:
# If it's not a bzImage it must be a raw binary,
# check if it's compressed first
b = self.decompress_raw()
# unlike with bzImage, the raw kernel binary has no header
# that includes the version, so we parse the version message
# that appears on boot
ver_start = 'Linux version '
pos = b.find(ver_start.encode())
if pos == -1:
raise UnrecognizedKernelError(
f'Kernel file {self.path} does not appear '
f'to have a version string, '
f'or the compression format was not recognized')
pos += len(ver_start)
sbuf = b[pos:pos + 0x100]
ret = sbuf.split(b' ', 1)
if len(ret) == 1:
raise UnrecognizedKernelError(
f'Kernel file {self.path} terminates before expected '
f'version string position ({offset + 0x200})')
return buf.split(b' ', 1)[0].decode()
f'Kernel file {self.path} terminates '
f'before end of version string')
return ret[0].decode()

def __repr__(self) -> str:
return (f'KernelImage({repr(self.path)})')
Expand Down
9 changes: 9 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,12 @@ no_implicit_optional = True

[mypy-pymountboot.*]
ignore_missing_imports = True

[mypy-lz4.*]
ignore_missing_imports = True

[mypy-lzo.*]
ignore_missing_imports = True

[mypy-zstandard.*]
ignore_missing_imports = True
78 changes: 76 additions & 2 deletions test/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
# (c) 2020 Michał Górny <[email protected]>
# Released under the terms of the 2-clause BSD license.

import gzip
import hashlib
import os
import tempfile
import unittest

from importlib import util
from pathlib import Path

from ecleankernel.file import (KernelImage, ModuleDirectory,
UnrecognizedKernelError)
UnrecognizedKernelError,
MissingDecompressorError)


def write_bzImage(path: Path,
Expand All @@ -24,20 +28,60 @@ def write_bzImage(path: Path,
f.write(version_line)


def write_raw(path: Path,
version_line: bytes
) -> None:
"""Write a raw kernel binary at `path`, with `version_line`"""
with open(path, 'wb') as f:
f.write(0x210 * b'\0')
f.write(version_line)


def write_compress(path: Path,
version_line: bytes
) -> None:
"""Write a gzip compressed raw kernel binary at `path`,
with `version_line`"""
with gzip.open(path, 'wb') as f:
# gzip would compress a string of 0s below 0x200,
# so we fill in some incompressible gibberish
s = b''
for i in range(1, 0xff):
m = hashlib.sha1()
m.update(i.to_bytes(1, 'little'))
s += m.digest()
f.write(s)
f.write(version_line)


class KernelImageTests(unittest.TestCase):
def setUp(self) -> None:
self.td = tempfile.TemporaryDirectory()

def tearDown(self) -> None:
self.td.cleanup()

def test_read_internal_version(self) -> None:
def test_read_internal_version_bz(self) -> None:
path = Path(self.td.name) / 'vmlinuz'
write_bzImage(path, b'1.2.3 built on test')
self.assertEqual(
KernelImage(path).read_internal_version(),
'1.2.3')

def test_read_internal_version_raw(self) -> None:
path = Path(self.td.name) / 'vmlinuz'
write_raw(path, b'Linux version 1.2.3 built on test')
self.assertEqual(
KernelImage(path).read_internal_version(),
'1.2.3')

def test_read_internal_version_compress(self) -> None:
path = Path(self.td.name) / 'vmlinuz'
write_compress(path, b'Linux version 1.2.3 built on test')
self.assertEqual(
KernelImage(path).read_internal_version(),
'1.2.3')

def test_very_short(self) -> None:
path = Path(self.td.name) / 'vmlinuz'
with open(path, 'wb') as f:
Expand All @@ -52,6 +96,36 @@ def test_bad_magic(self) -> None:
with self.assertRaises(UnrecognizedKernelError):
KernelImage(path).read_internal_version()

def test_bad_file_magic(self) -> None:
path = Path(self.td.name) / 'vmlinuz'
with open(path, 'w') as f:
f.write('Hello World')
with self.assertRaises(UnrecognizedKernelError):
KernelImage(path).read_internal_version()

def test_missing_decompressor(self) -> None:
if util.find_spec('lzo'):
self.skipTest('the missing decompressor is installed')
path = Path(self.td.name) / 'vmlinuz'
with open(path, 'wb') as f:
f.write(b'\x89\x4c\x5a\x4f\x00\x0d\x0a\x1a\x0a')
f.write(0x210 * b'\0')
with self.assertRaises(MissingDecompressorError):
KernelImage(path).read_internal_version()

def test_overflow_ver_string_bz(self) -> None:
path = Path(self.td.name) / 'vmlinuz'
path = Path(self.td.name) / 'vmlinuz'
write_bzImage(path, b'1.2.3' + 0xffff * b'\0')
with self.assertRaises(UnrecognizedKernelError):
KernelImage(path).read_internal_version()

def test_overflow_ver_string_raw(self) -> None:
path = Path(self.td.name) / 'vmlinuz'
write_raw(path, b'Linux version 1.2.3' + 0xffff * b'\0')
with self.assertRaises(UnrecognizedKernelError):
KernelImage(path).read_internal_version()

def test_short(self) -> None:
path = Path(self.td.name) / 'vmlinuz'
with open(path, 'wb') as f:
Expand Down
4 changes: 2 additions & 2 deletions test/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def setUp(self) -> None:

self.td = tempfile.TemporaryDirectory()
td = Path(self.td.name)
write_bzImage(td / 'kernel.old', b'old')
write_bzImage(td / 'kernel.new', b'new')
write_bzImage(td / 'kernel.old', b'old built on test')
write_bzImage(td / 'kernel.new', b'new built on test')
with open(td / 'config-stray', 'w'):
pass
with open(td / 'initrd-stray.img', 'w'):
Expand Down

0 comments on commit 93f109e

Please sign in to comment.