转载:
pom.xml
org.apache.hbase hbase-thrift 1.3.1
ThriftUtil.java
package hbase;import java.lang.Object;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import java.security.MessageDigest;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Map.Entry;import org.apache.hadoop.hbase.thrift.generated.BatchMutation;import org.apache.hadoop.hbase.thrift.generated.Hbase;import org.apache.hadoop.hbase.thrift.generated.IOError;import org.apache.hadoop.hbase.thrift.generated.Mutation;import org.apache.hadoop.hbase.thrift.generated.TCell;import org.apache.hadoop.hbase.thrift.generated.TRowResult;import org.apache.thrift.TException;import org.apache.thrift.protocol.TBinaryProtocol;import org.apache.thrift.protocol.TProtocol;import org.apache.thrift.transport.TSocket;import org.apache.thrift.transport.TTransport;import org.apache.thrift.transport.TTransportException;public class ThriftUtil { protected Hbase.Client hbaseClient = null; private String hbaseAddr = ""; private Integer hbasePort = 0; private TTransport socket = null; private TProtocol protocol = null; protected static final String CHAR_SET = "UTF-8"; public ThriftUtil(String addr, Integer port) { hbaseAddr = addr; hbasePort = port; socket = new TSocket(hbaseAddr, hbasePort); protocol = new TBinaryProtocol(socket, true, true); hbaseClient = new Hbase.Client(protocol); } public static void main(String[] args) throws TTransportException { ThriftUtil hbaseClient = null; try { hbaseClient = new ThriftUtil("192.168.101.144", 9090); hbaseClient.openTransport(); doSomeTest(hbaseClient); } catch (Exception e) { e.printStackTrace(); } finally { hbaseClient.closeTransport(); } } public static void doSomeTest(ThriftUtil client) throws TException { String tableName = "test5"; testScanTable(tableName, "", 1000, client); String rowKey_R1 = "r1"; MapkvpUpdate_r1 = new HashMap (); kvpUpdate_r1.put("cf:name", "val_20150618_0920_1"); kvpUpdate_r1.put("cf:age", "12");// client.updateRow(tableName, rowKey_R1, kvpUpdate_r1); String rowKey_R2 = "r2"; Map kvpUpdate_r2 = new HashMap (); kvpUpdate_r2.put("cf:name", "val_201505181028_r2c1"); kvpUpdate_r2.put("cf:age", "13"); Map > rowBatchData = new HashMap >(); rowBatchData.put(rowKey_R1, kvpUpdate_r1); rowBatchData.put(rowKey_R2, kvpUpdate_r2);// client.updateRows(tableName, rowBatchData); Map attributes = new HashMap ();// List rowRslts = client.getRow("test5", "r2", attributes); Map kvpUpdate_bs = new HashMap (); kvpUpdate_bs.put("cf:name", "val_20150519_1352"); kvpUpdate_bs.put("cf:age", "14");// client.updateRow(tableName, "r2", kvpUpdate_bs);// client.deleteCell(tableName, "r2", "cf:name"); List columns = new ArrayList (); columns.add("cf:name"); columns.add("cf:age");// client.deleteCells(tableName, "r1", columns); client.deleteRow(tableName, "r1"); testScanTable(tableName, "", 1000, client); System.out.println("Done."); } public static void testIterateRow(String tableName, String rowKey, ThriftUtil client) throws TException { Map attributes = new HashMap (); List results = client.getRow(tableName, rowKey, attributes); for (TRowResult rslt : results) { client.iterateResults(rslt); } } public static void testScanTable(String tableName, String startRow, int rowCnt, ThriftUtil client) throws TException { List columns = new ArrayList (0); Map attributesTest = new HashMap (); int scannerID = client.scannerOpen(tableName, startRow, columns, attributesTest); try { List scanResults = client.scannerGetList(scannerID, rowCnt); while (scanResults != null && !scanResults.isEmpty()) { for (TRowResult rslt : scanResults) { client.iterateResults(rslt); } scanResults = client.scannerGetList(scannerID, rowCnt); } } finally { client.scannerClose(scannerID); } } public void listTableNames(ThriftUtil client) throws TTransportException { List tblNames = client.getTableNames(); for (String name : tblNames) { System.out.println(">> " + name); } } public void deleteRow(String table, String rowKey) throws TException { ByteBuffer tableName = getByteBuffer(table); ByteBuffer row = getByteBuffer(rowKey); hbaseClient.deleteAllRow(tableName, row, getAttributesMap(new HashMap ())); } public void deleteCell(String table, String rowKey, String column) throws TException { List columns = new ArrayList (1); columns.add(column); deleteCells(table, rowKey, columns); } public void deleteCells(String table, String rowKey, List columns) throws TException { boolean writeToWal = false; List mutations = new ArrayList (); for (String column : columns) { mutations.add(new Mutation(false, getByteBuffer(column), null, writeToWal)); } ByteBuffer tableName = getByteBuffer(table); ByteBuffer row = getByteBuffer(rowKey); hbaseClient.mutateRow(tableName, row, mutations, getAttributesMap(new HashMap ())); } public void updateRow(String table, String rowKey, Map rowData) throws TException { boolean writeToWal = false; Map attributes = new HashMap (); List mutations = new ArrayList (); for(Map.Entry entry : rowData.entrySet()) { mutations.add(new Mutation(false, getByteBuffer(entry.getKey()), getByteBuffer(entry.getValue()), writeToWal)); } Map wrappedAttributes = getAttributesMap(attributes); ByteBuffer tableName = getByteBuffer(table); ByteBuffer row = getByteBuffer(rowKey); hbaseClient.mutateRow(tableName, row, mutations, wrappedAttributes); } public void updateRows(String table, Map > rowBatchData) throws TException { boolean writeToWal = false; Map attributes = new HashMap (); Map wrappedAttributes = getAttributesMap(attributes); ByteBuffer tableNameByte = getByteBuffer(table); List rowBatches = new ArrayList (); for(Entry > batchEntry : rowBatchData.entrySet()) { List mutations = new ArrayList (); for (Map.Entry rowData : batchEntry.getValue().entrySet()) { mutations.add(new Mutation(false, getByteBuffer(rowData.getKey()), getByteBuffer(rowData.getValue()), writeToWal)); } BatchMutation batchMutation = new BatchMutation(getByteBuffer(batchEntry.getKey()), mutations); rowBatches.add(batchMutation); } hbaseClient.mutateRows(tableNameByte, rowBatches, wrappedAttributes); } public void iterateResults(TRowResult result) { Iterator > iter = result.columns.entrySet().iterator(); System.out.println("RowKey:" + new String(result.getRow())); while (iter.hasNext()) { Entry entry = iter.next(); System.out.println("\tCol=" + new String(toBytes(entry.getKey())) + ", Value=" + new String(entry.getValue().getValue())); } } public List scannerGetList(int id, int nbRows)throws TException { return hbaseClient.scannerGetList(id, nbRows); } public List scannerGet(int id) throws TException { return hbaseClient.scannerGetList(id, 1); } public int scannerOpen(String table, String startRow, String stopRow, List columns, Map attributes) throws TException { ByteBuffer tableName = getByteBuffer(table); List blist = getColumnsByte(columns); Map wrappedAttributes = getAttributesMap(attributes); return hbaseClient.scannerOpenWithStop(tableName, getByteBuffer(startRow), getByteBuffer(stopRow), blist, wrappedAttributes); } public int scannerOpen(String table, String startRow, List columns, Map attributes) throws TException { ByteBuffer tableName = getByteBuffer(table); List blist = getColumnsByte(columns); Map wrappedAttributes = getAttributesMap(attributes); return hbaseClient.scannerOpen(tableName, getByteBuffer(startRow), blist, wrappedAttributes); } public void scannerClose(int id) throws TException { hbaseClient.scannerClose(id); } public List getColumnsByte(List columns) { List blist = new ArrayList (); for(String column : columns) { blist.add(getByteBuffer(column)); } return blist; } protected byte[] toBytes(ByteBuffer buffer) { byte[] bytes = new byte[buffer.limit()]; for (int i = 0; i < buffer.limit(); i++) { bytes[i] = buffer.get(); } return bytes; } public List getRow(String table, String row, Map attributes) throws TException { ByteBuffer tableName = getByteBuffer(table); Map wrappedAttributes = getAttributesMap(attributes); return hbaseClient.getRow(tableName, getByteBuffer(row), wrappedAttributes); } public List getTableNames() throws TTransportException { ArrayList tableNames = new ArrayList (); try { for (ByteBuffer name : hbaseClient.getTableNames()) { tableNames.add(byteBufferToString(name)); } return tableNames; } catch (Exception e) { e.printStackTrace(); return null; } } private static Map getAttributesMap(Map attributes) { Map attributesMap = null; if(attributes != null && !attributes.isEmpty()) { attributesMap = new HashMap (); for(Map.Entry entry : attributes.entrySet()) { attributesMap.put(getByteBuffer(entry.getKey()), getByteBuffer(entry.getValue())); } } return attributesMap; } public static String byteBufferToString(ByteBuffer buffer) { CharBuffer charBuffer = null; try { Charset charset = Charset.forName(CHAR_SET); CharsetDecoder decoder = charset.newDecoder(); charBuffer = decoder.decode(buffer); buffer.flip(); return charBuffer.toString(); } catch (Exception ex) { ex.printStackTrace(); return null; } } public static ByteBuffer getByteBuffer(String str) { return ByteBuffer.wrap(str.getBytes()); } public void openTransport() throws TTransportException { if (socket != null) { socket.open(); } } public void closeTransport() throws TTransportException { if (socket != null) { socket.close(); } }}
demo:
相关资料: