Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
add copy_data operator in PegasusCli (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
qinzuoyan authored and huangwei5 committed Jan 4, 2019
1 parent d921c17 commit 092e142
Showing 1 changed file with 69 additions and 0 deletions.
69 changes: 69 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusCli.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

import com.xiaomi.infra.pegasus.tools.ZstdWrapper;
import org.apache.commons.lang3.tuple.Pair;

import java.util.ArrayList;
Expand All @@ -15,6 +16,11 @@
* This class provides a client tool to access pegasus data.
*/
public class PegasusCli {
public enum CompressionType {
none,
zstd,
}

public static void usage() {
System.out.println();
System.out.println("USAGE: PegasusCli <config-path> <table-name> <op-name> ...");
Expand All @@ -35,6 +41,8 @@ public static void usage() {
System.out.println(" - incr <hash_key> <sort_key> [increment]");
System.out.println(" - scan <hash_key> [start_sort_key] [stop_sort_key] [max_count]");
System.out.println(" - scan_all [max_count]");
System.out.println(" - copy_data <target_cluster_config_path> <target_table_name> " +
"[read_uncompress_type(none|zstd)] [write_compress_type(none|zstd)] [max_count]");
System.out.println();
System.out.println(" For example:");
System.out.println(" PegasusCli file://./pegasus.properties temp get hash_key sort_key");
Expand Down Expand Up @@ -62,6 +70,10 @@ public static void main(String args[]) {
byte[] startSortKey = null;
byte[] stopSortKey = null;
int maxCount = Integer.MAX_VALUE;
String targetClusterConfigPath = null;
String targetTableName = null;
CompressionType readUncompressType = CompressionType.none;
CompressionType writeCompressType = CompressionType.none;
if (opName.equals("get") || opName.equals("del")) {
if (args.length != 2) {
System.out.println("ERROR: invalid parameter count");
Expand Down Expand Up @@ -164,6 +176,24 @@ else if (opName.equals("scan_all")) {
maxCount = Integer.parseInt(args[0]);
}
}
else if (opName.equals("copy_data")) {
if (args.length < 2) {
System.out.println("ERROR: invalid parameter count");
usage();
return;
}
targetClusterConfigPath = args[0];
targetTableName = args[1];
if (args.length > 2) {
readUncompressType = CompressionType.valueOf(args[2]);
}
if (args.length > 3) {
writeCompressType = CompressionType.valueOf(args[3]);
}
if (args.length > 4) {
maxCount = Integer.parseInt(args[4]);
}
}
else {
System.out.println("ERROR: invalid op-name: " + opName);
usage();
Expand Down Expand Up @@ -235,6 +265,7 @@ else if (opName.equals("scan")) {
new String(p.getKey().getValue()), new String(p.getValue()));
count++;
}
scanner.close();
if (count > 0)
System.out.println();
System.out.printf("%d key-value pairs got.\n", count);
Expand All @@ -250,10 +281,48 @@ else if (opName.equals("scan_all")) {
new String(p.getKey().getValue()), new String(p.getValue()));
count++;
}
scanner.close();
}
if (count > 0)
System.out.println();
System.out.printf("%d key-value pairs got.\n", count);
} else if (opName.equals("copy_data")) {
PegasusClientInterface targetClient = PegasusClientFactory.createClient(targetClusterConfigPath);
try {
PegasusTableInterface targetTable = targetClient.openTable(targetTableName);
List<PegasusScannerInterface> scanners = client.getUnorderedScanners(appName, 1, new ScanOptions());
int count = 0;
if (scanners.size() > 0) {
PegasusScannerInterface scanner = scanners.get(0);
Pair<Pair<byte[], byte[]>, byte[]> p;
while (count < maxCount && (p = scanner.next()) != null) {
byte[] newValue = p.getValue();
switch (readUncompressType) {
case none:
break;
case zstd:
newValue = ZstdWrapper.decompress(newValue);
break;
}
switch (writeCompressType) {
case none:
break;
case zstd:
newValue = ZstdWrapper.compress(newValue);
break;
}
targetTable.set(p.getKey().getKey(), p.getKey().getValue(), newValue, 0);
count++;
if (count % 10000 == 0) {
System.out.printf("Copied %d key-value pairs.\n", count);
}
}
scanner.close();
}
System.out.printf("Done, copied %d key-value pairs.\n", count);
} finally {
targetClient.close();
}
}
}
catch (PException e) {
Expand Down

0 comments on commit 092e142

Please sign in to comment.