博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hbase thrift java util
阅读量:5050 次
发布时间:2019-06-12

本文共 9864 字,大约阅读时间需要 32 分钟。

转载:

pom.xml

org.apache.hbase
hbase-thrift
1.3.1

ThriftUtil.java

package hbase;import java.lang.Object;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import java.security.MessageDigest;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Map.Entry;import org.apache.hadoop.hbase.thrift.generated.BatchMutation;import org.apache.hadoop.hbase.thrift.generated.Hbase;import org.apache.hadoop.hbase.thrift.generated.IOError;import org.apache.hadoop.hbase.thrift.generated.Mutation;import org.apache.hadoop.hbase.thrift.generated.TCell;import org.apache.hadoop.hbase.thrift.generated.TRowResult;import org.apache.thrift.TException;import org.apache.thrift.protocol.TBinaryProtocol;import org.apache.thrift.protocol.TProtocol;import org.apache.thrift.transport.TSocket;import org.apache.thrift.transport.TTransport;import org.apache.thrift.transport.TTransportException;public class ThriftUtil {    protected Hbase.Client hbaseClient = null;    private String hbaseAddr = "";    private Integer hbasePort = 0;    private TTransport socket = null;    private TProtocol protocol = null;    protected static final String CHAR_SET = "UTF-8";    public ThriftUtil(String addr, Integer port) {        hbaseAddr = addr;        hbasePort = port;        socket = new TSocket(hbaseAddr, hbasePort);        protocol = new TBinaryProtocol(socket, true, true);        hbaseClient = new Hbase.Client(protocol);    }    public static void main(String[] args) throws TTransportException {        ThriftUtil hbaseClient = null;        try {            hbaseClient = new ThriftUtil("192.168.101.144", 9090);            hbaseClient.openTransport();            doSomeTest(hbaseClient);        } catch (Exception e) {            e.printStackTrace();        } finally {            hbaseClient.closeTransport();        }    }    public static void doSomeTest(ThriftUtil client) throws TException {        String tableName = "test5";        testScanTable(tableName, "", 1000, client);        String rowKey_R1 = "r1";        Map
kvpUpdate_r1 = new HashMap
(); kvpUpdate_r1.put("cf:name", "val_20150618_0920_1"); kvpUpdate_r1.put("cf:age", "12");// client.updateRow(tableName, rowKey_R1, kvpUpdate_r1); String rowKey_R2 = "r2"; Map
kvpUpdate_r2 = new HashMap
(); kvpUpdate_r2.put("cf:name", "val_201505181028_r2c1"); kvpUpdate_r2.put("cf:age", "13"); Map
> rowBatchData = new HashMap
>(); rowBatchData.put(rowKey_R1, kvpUpdate_r1); rowBatchData.put(rowKey_R2, kvpUpdate_r2);// client.updateRows(tableName, rowBatchData); Map
attributes = new HashMap
();// List
rowRslts = client.getRow("test5", "r2", attributes); Map
kvpUpdate_bs = new HashMap
(); kvpUpdate_bs.put("cf:name", "val_20150519_1352"); kvpUpdate_bs.put("cf:age", "14");// client.updateRow(tableName, "r2", kvpUpdate_bs);// client.deleteCell(tableName, "r2", "cf:name"); List
columns = new ArrayList
(); columns.add("cf:name"); columns.add("cf:age");// client.deleteCells(tableName, "r1", columns); client.deleteRow(tableName, "r1"); testScanTable(tableName, "", 1000, client); System.out.println("Done."); } public static void testIterateRow(String tableName, String rowKey, ThriftUtil client) throws TException { Map
attributes = new HashMap
(); List
results = client.getRow(tableName, rowKey, attributes); for (TRowResult rslt : results) { client.iterateResults(rslt); } } public static void testScanTable(String tableName, String startRow, int rowCnt, ThriftUtil client) throws TException { List
columns = new ArrayList
(0); Map
attributesTest = new HashMap
(); int scannerID = client.scannerOpen(tableName, startRow, columns, attributesTest); try { List
scanResults = client.scannerGetList(scannerID, rowCnt); while (scanResults != null && !scanResults.isEmpty()) { for (TRowResult rslt : scanResults) { client.iterateResults(rslt); } scanResults = client.scannerGetList(scannerID, rowCnt); } } finally { client.scannerClose(scannerID); } } public void listTableNames(ThriftUtil client) throws TTransportException { List
tblNames = client.getTableNames(); for (String name : tblNames) { System.out.println(">> " + name); } } public void deleteRow(String table, String rowKey) throws TException { ByteBuffer tableName = getByteBuffer(table); ByteBuffer row = getByteBuffer(rowKey); hbaseClient.deleteAllRow(tableName, row, getAttributesMap(new HashMap
())); } public void deleteCell(String table, String rowKey, String column) throws TException { List
columns = new ArrayList
(1); columns.add(column); deleteCells(table, rowKey, columns); } public void deleteCells(String table, String rowKey, List
columns) throws TException { boolean writeToWal = false; List
mutations = new ArrayList
(); for (String column : columns) { mutations.add(new Mutation(false, getByteBuffer(column), null, writeToWal)); } ByteBuffer tableName = getByteBuffer(table); ByteBuffer row = getByteBuffer(rowKey); hbaseClient.mutateRow(tableName, row, mutations, getAttributesMap(new HashMap
())); } public void updateRow(String table, String rowKey, Map
rowData) throws TException { boolean writeToWal = false; Map
attributes = new HashMap
(); List
mutations = new ArrayList
(); for(Map.Entry
entry : rowData.entrySet()) { mutations.add(new Mutation(false, getByteBuffer(entry.getKey()), getByteBuffer(entry.getValue()), writeToWal)); } Map
wrappedAttributes = getAttributesMap(attributes); ByteBuffer tableName = getByteBuffer(table); ByteBuffer row = getByteBuffer(rowKey); hbaseClient.mutateRow(tableName, row, mutations, wrappedAttributes); } public void updateRows(String table, Map
> rowBatchData) throws TException { boolean writeToWal = false; Map
attributes = new HashMap
(); Map
wrappedAttributes = getAttributesMap(attributes); ByteBuffer tableNameByte = getByteBuffer(table); List
rowBatches = new ArrayList
(); for(Entry
> batchEntry : rowBatchData.entrySet()) { List
mutations = new ArrayList
(); for (Map.Entry
rowData : batchEntry.getValue().entrySet()) { mutations.add(new Mutation(false, getByteBuffer(rowData.getKey()), getByteBuffer(rowData.getValue()), writeToWal)); } BatchMutation batchMutation = new BatchMutation(getByteBuffer(batchEntry.getKey()), mutations); rowBatches.add(batchMutation); } hbaseClient.mutateRows(tableNameByte, rowBatches, wrappedAttributes); } public void iterateResults(TRowResult result) { Iterator
> iter = result.columns.entrySet().iterator(); System.out.println("RowKey:" + new String(result.getRow())); while (iter.hasNext()) { Entry
entry = iter.next(); System.out.println("\tCol=" + new String(toBytes(entry.getKey())) + ", Value=" + new String(entry.getValue().getValue())); } } public List
scannerGetList(int id, int nbRows)throws TException { return hbaseClient.scannerGetList(id, nbRows); } public List
scannerGet(int id) throws TException { return hbaseClient.scannerGetList(id, 1); } public int scannerOpen(String table, String startRow, String stopRow, List
columns, Map
attributes) throws TException { ByteBuffer tableName = getByteBuffer(table); List
blist = getColumnsByte(columns); Map
wrappedAttributes = getAttributesMap(attributes); return hbaseClient.scannerOpenWithStop(tableName, getByteBuffer(startRow), getByteBuffer(stopRow), blist, wrappedAttributes); } public int scannerOpen(String table, String startRow, List
columns, Map
attributes) throws TException { ByteBuffer tableName = getByteBuffer(table); List
blist = getColumnsByte(columns); Map
wrappedAttributes = getAttributesMap(attributes); return hbaseClient.scannerOpen(tableName, getByteBuffer(startRow), blist, wrappedAttributes); } public void scannerClose(int id) throws TException { hbaseClient.scannerClose(id); } public List
getColumnsByte(List
columns) { List
blist = new ArrayList
(); for(String column : columns) { blist.add(getByteBuffer(column)); } return blist; } protected byte[] toBytes(ByteBuffer buffer) { byte[] bytes = new byte[buffer.limit()]; for (int i = 0; i < buffer.limit(); i++) { bytes[i] = buffer.get(); } return bytes; } public List
getRow(String table, String row, Map
attributes) throws TException { ByteBuffer tableName = getByteBuffer(table); Map
wrappedAttributes = getAttributesMap(attributes); return hbaseClient.getRow(tableName, getByteBuffer(row), wrappedAttributes); } public List
getTableNames() throws TTransportException { ArrayList
tableNames = new ArrayList
(); try { for (ByteBuffer name : hbaseClient.getTableNames()) { tableNames.add(byteBufferToString(name)); } return tableNames; } catch (Exception e) { e.printStackTrace(); return null; } } private static Map
getAttributesMap(Map
attributes) { Map
attributesMap = null; if(attributes != null && !attributes.isEmpty()) { attributesMap = new HashMap
(); for(Map.Entry
entry : attributes.entrySet()) { attributesMap.put(getByteBuffer(entry.getKey()), getByteBuffer(entry.getValue())); } } return attributesMap; } public static String byteBufferToString(ByteBuffer buffer) { CharBuffer charBuffer = null; try { Charset charset = Charset.forName(CHAR_SET); CharsetDecoder decoder = charset.newDecoder(); charBuffer = decoder.decode(buffer); buffer.flip(); return charBuffer.toString(); } catch (Exception ex) { ex.printStackTrace(); return null; } } public static ByteBuffer getByteBuffer(String str) { return ByteBuffer.wrap(str.getBytes()); } public void openTransport() throws TTransportException { if (socket != null) { socket.open(); } } public void closeTransport() throws TTransportException { if (socket != null) { socket.close(); } }}

 

demo:

相关资料:

 

转载于:https://www.cnblogs.com/libin2015/p/9506558.html

你可能感兴趣的文章
mysql 多表管理修改
查看>>
group by order by
查看>>
bzoj 5252: [2018多省省队联测]林克卡特树
查看>>
https 学习笔记三
查看>>
Oracle学习之简单查询
查看>>
log4j配置
查看>>
linux 配置SAN存储-IPSAN
查看>>
双链表
查看>>
java学习笔记之String类
查看>>
pymysql操作mysql
查看>>
Linux服务器删除乱码文件/文件夹的方法
查看>>
牛腩记账本core版本源码
查看>>
Word Break II
查看>>
UVA 11082 Matrix Decompressing 矩阵解压(最大流,经典)
查看>>
BZOJ4669抢夺(费用流+二分答案)
查看>>
bzoj1606
查看>>
jdk从1.8降到jdk1.7失败
查看>>
一些关于IO流的问题
查看>>
mongo备份操作
查看>>
8 -- 深入使用Spring -- 3...1 Resource实现类InputStreamResource、ByteArrayResource
查看>>