diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusCli.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusCli.java index c0d3dce7..b9dd8f4c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusCli.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusCli.java @@ -3,6 +3,7 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import com.xiaomi.infra.pegasus.tools.ZstdWrapper; import org.apache.commons.lang3.tuple.Pair; import java.util.ArrayList; @@ -15,6 +16,11 @@ * This class provides a client tool to access pegasus data. */ public class PegasusCli { + public enum CompressionType { + none, + zstd, + } + public static void usage() { System.out.println(); System.out.println("USAGE: PegasusCli ..."); @@ -35,6 +41,8 @@ public static void usage() { System.out.println(" - incr [increment]"); System.out.println(" - scan [start_sort_key] [stop_sort_key] [max_count]"); System.out.println(" - scan_all [max_count]"); + System.out.println(" - copy_data " + + "[read_uncompress_type(none|zstd)] [write_compress_type(none|zstd)] [max_count]"); System.out.println(); System.out.println(" For example:"); System.out.println(" PegasusCli file://./pegasus.properties temp get hash_key sort_key"); @@ -62,6 +70,10 @@ public static void main(String args[]) { byte[] startSortKey = null; byte[] stopSortKey = null; int maxCount = Integer.MAX_VALUE; + String targetClusterConfigPath = null; + String targetTableName = null; + CompressionType readUncompressType = CompressionType.none; + CompressionType writeCompressType = CompressionType.none; if (opName.equals("get") || opName.equals("del")) { if (args.length != 2) { System.out.println("ERROR: invalid parameter count"); @@ -164,6 +176,24 @@ else if (opName.equals("scan_all")) { maxCount = Integer.parseInt(args[0]); } } + else if (opName.equals("copy_data")) { + if (args.length < 2) { + System.out.println("ERROR: invalid parameter count"); + usage(); + return; + } + targetClusterConfigPath = args[0]; + targetTableName = args[1]; + if (args.length > 2) { + readUncompressType = CompressionType.valueOf(args[2]); + } + if (args.length > 3) { + writeCompressType = CompressionType.valueOf(args[3]); + } + if (args.length > 4) { + maxCount = Integer.parseInt(args[4]); + } + } else { System.out.println("ERROR: invalid op-name: " + opName); usage(); @@ -235,6 +265,7 @@ else if (opName.equals("scan")) { new String(p.getKey().getValue()), new String(p.getValue())); count++; } + scanner.close(); if (count > 0) System.out.println(); System.out.printf("%d key-value pairs got.\n", count); @@ -250,10 +281,48 @@ else if (opName.equals("scan_all")) { new String(p.getKey().getValue()), new String(p.getValue())); count++; } + scanner.close(); } if (count > 0) System.out.println(); System.out.printf("%d key-value pairs got.\n", count); + } else if (opName.equals("copy_data")) { + PegasusClientInterface targetClient = PegasusClientFactory.createClient(targetClusterConfigPath); + try { + PegasusTableInterface targetTable = targetClient.openTable(targetTableName); + List scanners = client.getUnorderedScanners(appName, 1, new ScanOptions()); + int count = 0; + if (scanners.size() > 0) { + PegasusScannerInterface scanner = scanners.get(0); + Pair, byte[]> p; + while (count < maxCount && (p = scanner.next()) != null) { + byte[] newValue = p.getValue(); + switch (readUncompressType) { + case none: + break; + case zstd: + newValue = ZstdWrapper.decompress(newValue); + break; + } + switch (writeCompressType) { + case none: + break; + case zstd: + newValue = ZstdWrapper.compress(newValue); + break; + } + targetTable.set(p.getKey().getKey(), p.getKey().getValue(), newValue, 0); + count++; + if (count % 10000 == 0) { + System.out.printf("Copied %d key-value pairs.\n", count); + } + } + scanner.close(); + } + System.out.printf("Done, copied %d key-value pairs.\n", count); + } finally { + targetClient.close(); + } } } catch (PException e) {