From 447fc329fad4a8ea48449e3f429de7e531322d7e Mon Sep 17 00:00:00 2001 From: Danil Date: Fri, 26 Apr 2024 06:32:50 +0000 Subject: [PATCH] use cuszp module in compressor; add compressors to init.py --- qtensor/compression/Compressor.py | 64 ++++++++++++++----- qtensor/compression/__init__.py | 4 ++ .../tests/test_compressed_tensor.py | 55 ++++++++++------ qtensor/compression/tests/test_memory_leak.py | 3 + .../performance_measurement_decorator.py | 4 +- 5 files changed, 94 insertions(+), 36 deletions(-) diff --git a/qtensor/compression/Compressor.py b/qtensor/compression/Compressor.py index 520f7c5a..407e8410 100644 --- a/qtensor/compression/Compressor.py +++ b/qtensor/compression/Compressor.py @@ -17,6 +17,7 @@ import torch +import cuszp try: from cuszx_wrapper import cuszx_host_compress, cuszx_host_decompress, cuszx_device_compress, cuszx_device_decompress from cuSZp_wrapper import cuszp_device_compress, cuszp_device_decompress @@ -157,17 +158,30 @@ def __init__(self, r2r_error=1e-3, r2r_threshold=1e-3): def free_decompressed(self): import cupy - print("Cleanup", len(self.decompressed_own)) + print("Decompressed data Cleanup", len(self.decompressed_own)) for x in self.decompressed_own: - del x - cupy.get_default_memory_pool().free_all_blocks() - cupy.get_default_pinned_memory_pool().free_all_blocks() - torch.cuda.empty_cache() + cupy.cuda.runtime.free(x) + # del x + # need to run this for every x? + cupy.get_default_memory_pool().free_all_blocks() + #cupy.get_default_pinned_memory_pool().free_all_blocks() + #torch.cuda.empty_cache() self.decompressed_own = [] + #cupy.get_default_memory_pool().free_all_blocks() + #cupy.get_default_pinned_memory_pool().free_all_blocks() + #torch.cuda.empty_cache() + #self.decompressed_own = [] def free_compressed(self, ptr): + #return import ctypes, cupy - cmp_bytes, num_elements_eff, shape, dtype, _ = ptr + #cmp_bytes, num_elements_eff, shape, dtype, _ = ptr + cmp_t_real, cmp_t_imag, shape, dtype = ptr + del cmp_t_real + del cmp_t_imag + torch.cuda.empty_cache() + return + print(f"Freeing compressed data {num_elements_eff}") p_decompressed_ptr = ctypes.addressof(cmp_bytes[0]) # cast to int64 pointer # (effectively converting pointer to pointer to addr to pointer to int64) @@ -175,28 +189,48 @@ def free_compressed(self, ptr): decompressed_int = p_decompressed_int.contents cupy.cuda.runtime.free(decompressed_int.value) cupy.get_default_memory_pool().free_all_blocks() - del cmp_bytes + #del cmp_bytes def compress(self, data): isCupy, num_elements_eff = _get_data_info(data) dtype = data.dtype - print("Compressing") - print(type(data), type(num_elements_eff)) - cmp_bytes, outSize_ptr = cuszp_device_compress(data, self.r2r_error,num_elements_eff, self.r2r_threshold) - return (cmp_bytes, num_elements_eff, data.shape, dtype, outSize_ptr.contents.value) + # convert cupy to torch + data_imag = torch.as_tensor(data.imag, device='cuda').contiguous() + data_real = torch.as_tensor(data.real, device='cuda').contiguous() + print(f"cuszp Compressing {type(data)}") + #cmp_bytes, outSize_ptr = cuszp_device_compress(data, self.r2r_error, num_elements_eff, self.r2r_threshold) + cmp_t_real = cuszp.compress(data_real, self.r2r_error, 'rel') + cmp_t_imag = cuszp.compress(data_imag, self.r2r_error, 'rel') + return (cmp_t_real, cmp_t_imag, data.shape, dtype) # return (cmp_bytes, num_elements_eff, isCuPy, data.shape, dtype, outSize_ptr.contents.value) def compress_size(self, ptr): - return ptr[4] + #return ptr[4] + return ptr[0].nbytes + ptr[1].nbytes def decompress(self, obj): import cupy - cmp_bytes, num_elements_eff, shape, dtype, cmpsize = obj - decompressed_ptr = cuszp_device_decompress(num_elements_eff, cmp_bytes, cmpsize, self, dtype) + #cmp_bytes, num_elements_eff, shape, dtype, cmpsize = obj + #decompressed_ptr = cuszp_device_decompress(num_elements_eff, cmp_bytes, cmpsize, self, dtype) + cmp_t_real, cmp_t_imag, shape, dtype = obj + num_elements_decompressed = 1 + for s in shape: + num_elements_decompressed *= s + decomp_t_real = cuszp.decompress(cmp_t_real, num_elements_decompressed, cmp_t_real.nbytes, self.r2r_error, 'rel') + decomp_t_imag = cuszp.decompress(cmp_t_imag, num_elements_decompressed, cmp_t_imag.nbytes, self.r2r_error, 'rel') + decomp_t = decomp_t_real + 1j * decomp_t_imag + arr_cp = cupy.asarray(decomp_t) + arr = cupy.reshape(arr_cp, shape) + return arr arr_cp = decompressed_ptr[0] + # Cupy memory management might not deallocate memory properly + #arr = cupy.reshape(arr_cp, shape) + #self.decompressed_own.append(arr) + # Use pointer instead, as in cuszx + arr_cp = decompressed_ptr[0] + self.decompressed_own.append(decompressed_ptr[1]) arr = cupy.reshape(arr_cp, shape) - self.decompressed_own.append(arr) return arr class TorchCompressor(Compressor): diff --git a/qtensor/compression/__init__.py b/qtensor/compression/__init__.py index 9e320426..7a69a45c 100644 --- a/qtensor/compression/__init__.py +++ b/qtensor/compression/__init__.py @@ -4,7 +4,11 @@ CUSZCompressor, CUSZXCompressor, ProfileCompressor, + CUSZPCompressor, + TorchCompressor, ) from .CompressedTensor import CompressedTensor, Tensor from .compressed_contraction import compressed_contract, compressed_sum from .cost_estimation import compressed_contraction_cost + + diff --git a/qtensor/compression/tests/test_compressed_tensor.py b/qtensor/compression/tests/test_compressed_tensor.py index 29ad3243..dd71f97d 100644 --- a/qtensor/compression/tests/test_compressed_tensor.py +++ b/qtensor/compression/tests/test_compressed_tensor.py @@ -1,10 +1,16 @@ from qtensor.compression import CompressedTensor -from qtensor.compression import NumpyCompressor, CUSZCompressor +from qtensor.compression import ( + NumpyCompressor, + CUSZPCompressor, + CUSZXCompressor, + TorchCompressor, +) from qtree.optimizer import Var from qtree.system_defs import NP_ARRAY_TYPE import pytest import numpy as np + def test_empty_tensor(): shape = (2, 3, 4) indices = [Var(i, size=s) for i, s in enumerate(shape)] @@ -42,27 +48,38 @@ def test_slice_tensor(): assert S.data is not None assert np.allclose(t.get_chunk([1, 2]), S.data) -@pytest.mark.parametrize(argnames=["shape", "compressor", "dtype"], - argvalues=[ - ((2, 3, 4), NumpyCompressor(), np.float32), - ((2, 3, 4), NumpyCompressor(), np.float64), - ((2, 3, 4), CUSZCompressor(), np.float32), - ((2, 3, 4), CUSZCompressor(), np.float64), - ((2, 3, 4), CUSZCompressor(), np.complex128), - ((2,)*20, CUSZCompressor(), np.float32), - ((2,)*20, CUSZCompressor(), np.complex64), - # Not supported: - #((2,)*20, CUSZCompressor(), np.float64) - ] - ) -def test_compressors(shape, compressor, dtype): - print(shape, compressor, dtype) + +@pytest.mark.parametrize( + argnames=["shape", "compressor_cls", "dtype"], + argvalues=[ + ((2, 3, 4), NumpyCompressor, np.float32), + ((2, 3, 4), NumpyCompressor, np.float64), + #((2,) * 20, TorchCompressor, np.complex64), + ((2,) * 20, CUSZXCompressor, np.complex64), + ((2,) * 20, CUSZPCompressor, np.complex64), + + # Not supported: + # ((2, 3, 4), CUSZXCompressor, np.float32), + # ((2, 3, 4), CUSZXCompressor, np.float64), + # ((2, 3, 4), CUSZXCompressor, np.complex128), + # ((2,)*20, CUSZXCompressor, np.float32), + # ((2,)*20, CUSZCompressor(), np.float64) + ], +) +def test_compressors(shape, compressor_cls, dtype): + print(shape, compressor_cls, dtype) + compressor = compressor_cls() import cupy + indices = [Var(i, size=s) for i, s in enumerate(shape)] if dtype is np.complex128: - data = cupy.random.random(shape, dtype=np.float64) + 1j*cupy.random.random(shape, dtype=np.float64) + data = cupy.random.random(shape, dtype=np.float64) + 1j * cupy.random.random( + shape, dtype=np.float64 + ) elif dtype is np.complex64: - data = cupy.random.random(shape, dtype=np.float32) + 1j*cupy.random.random(shape, dtype=np.float32) + data = cupy.random.random(shape, dtype=np.float32) + 1j * cupy.random.random( + shape, dtype=np.float32 + ) else: data = cupy.random.random(shape, dtype=dtype) t = CompressedTensor("myT", indices, data=data, compressor=compressor) @@ -76,4 +93,4 @@ def test_compressors(shape, compressor, dtype): ref = cupy.asnumpy(s.data) assert np.allclose(ch, ref) - assert np.allclose(ch, data[1], rtol=0.1, atol=.01) + assert np.allclose(ch, data[1], rtol=0.15, atol=0.05) diff --git a/qtensor/compression/tests/test_memory_leak.py b/qtensor/compression/tests/test_memory_leak.py index e0ca675c..34f327a2 100644 --- a/qtensor/compression/tests/test_memory_leak.py +++ b/qtensor/compression/tests/test_memory_leak.py @@ -106,3 +106,6 @@ def test_leak_contract(): print( f"== [{j}] Memory history: {[np.round(x, 2) for x in _mem_histories]} GB ==" ) + +if __name__ == "__main__": + test_leak_contract() diff --git a/qtensor/contraction_backends/performance_measurement_decorator.py b/qtensor/contraction_backends/performance_measurement_decorator.py index 39efffa7..c2ae5bc3 100644 --- a/qtensor/contraction_backends/performance_measurement_decorator.py +++ b/qtensor/contraction_backends/performance_measurement_decorator.py @@ -77,7 +77,7 @@ def check_store(self): if isinstance(self.backend, CompressionBackend): gpu_mem += 8*2**self.backend.max_tw self.mem_history.append(dict( - mem=gpu_mem, + mem=total_mem, cupy_bufsize=mempool.total_bytes(), nvmem = self._get_nvsmi_mem(), cupybuf=mempool.total_bytes(), @@ -85,7 +85,7 @@ def check_store(self): tensors_sizes=[len(tensor.indices) for tensor in self.object_store.values()] )) # -- - print('MH', self.mem_history[-1]) + #print('MH', self.mem_history[-1]) if cupy_mem>1024**2: self._print("CuPy memory usage", cupy_mem/1024/1024, "MB. Total MB:", mempool.total_bytes()/1024**2)