-
HBase Java简单示例
添加时间:2013-7-19 点击量:Hbase采取Java实现,原生客户端也是Java实现,其他说话须要经由过程thritf接口办事间接接见Hbase的数据。
Hbase作为大数据存储数据库,其写才能很是强,加上Hbase本身就脱胎于Hadoop故和Hadoop的兼容性极好,很是合适于存储半规矩数据(灵活、可扩大性强、大数据存储)。基于Hadoop的mapreduce + Hbase存储,很是合适处理惩罚大数据。
Hbase根蒂根基应用示例:
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseTest {
public static Configuration configuration;
static {
configuration = HBaseConfiguration.create();
configuration.set(hbase.zookeeper.property.clientPort, 2181);
configuration.set(hbase.zookeeper.quorum, 192.168.1.100);
configuration.set(hbase.master, 192.168.1.100:600000);
}
public static void main(String[] args) {
// createTable(wujintao);
// Data(wujintao);
// QueryAll(wujintao);
// QueryByCondition1(wujintao);
// QueryByCondition2(wujintao);
//QueryByCondition3(wujintao);
//Row(wujintao,abcdef);
ByCondition(wujintao,abcdef);
}
public static void createTable(String tableName) {
System.out.println(start create table ......);
try {
HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration);
if (hBaseAdmin.tableExists(tableName)) {// 若是存在要创建的表,那么先删除,再创建
hBaseAdmin.disableTable(tableName);
hBaseAdmin.Table(tableName);
System.out.println(tableName + is exist,detele....);
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
tableDescriptor.addFamily(new HColumnDescriptor(column1));
tableDescriptor.addFamily(new HColumnDescriptor(column2));
tableDescriptor.addFamily(new HColumnDescriptor(column3));
hBaseAdmin.createTable(tableDescriptor);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(end create table ......);
}
public static void Data(String tableName) {
System.out.println(start data ......);
HTablePool pool = new HTablePool(configuration, 1000);
HTable table = (HTable) pool.getTable(tableName);
Put put = new Put(112233bbbcccc.getBytes());// 一个PUT代表一行数据,再NEW一个PUT默示第二行数据,每行一个独一的ROWKEY,此处rowkey为put机关办法中传入的值
put.add(column1.getBytes(), null, aaa.getBytes());// 本行数据的第一列
put.add(column2.getBytes(), null, bbb.getBytes());// 本行数据的第三列
put.add(column3.getBytes(), null, ccc.getBytes());// 本行数据的第三列
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(end data ......);
}
public static void dropTable(String tableName) {
try {
HBaseAdmin admin = new HBaseAdmin(configuration);
admin.disableTable(tableName);
admin.Table(tableName);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void Row(String tablename, String rowkey) {
try {
HTable table = new HTable(configuration, tablename);
List list = new ArrayList();
Delete d1 = new Delete(rowkey.getBytes());
list.add(d1);
table.(list);
System.out.println(删除行成功!);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void ByCondition(String tablename, String rowkey) {
//今朝还没有发明有效的API可以或许实现按照非rowkey的前提删除这个功能能,还有清空表全部数据的API操纵
}
public static void QueryAll(String tableName) {
HTablePool pool = new HTablePool(configuration, 1000);
HTable table = (HTable) pool.getTable(tableName);
try {
ResultScanner rs = table.getScanner(new Scan());
for (Result r : rs) {
System.out.println(获获得rowkey: + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println(列: + new String(keyValue.getFamily())
+ ====值: + new String(keyValue.getValue()));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void QueryByCondition1(String tableName) {
HTablePool pool = new HTablePool(configuration, 1000);
HTable table = (HTable) pool.getTable(tableName);
try {
Get scan = new Get(abcdef.getBytes());// 按照rowkey查询
Result r = table.get(scan);
System.out.println(获获得rowkey: + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println(列: + new String(keyValue.getFamily())
+ ====值: + new String(keyValue.getValue()));
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void QueryByCondition2(String tableName) {
try {
HTablePool pool = new HTablePool(configuration, 1000);
HTable table = (HTable) pool.getTable(tableName);
Filter filter = new SingleColumnValueFilter(Bytes
.toBytes(column1), null, CompareOp.EQUAL, Bytes
.toBytes(aaa)); // 当列column1的值为aaa时进行查询
Scan s = new Scan();
s.setFilter(filter);
ResultScanner rs = table.getScanner(s);
for (Result r : rs) {
System.out.println(获获得rowkey: + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println(列: + new String(keyValue.getFamily())
+ ====值: + new String(keyValue.getValue()));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void QueryByCondition3(String tableName) {
try {
HTablePool pool = new HTablePool(configuration, 1000);
HTable table = (HTable) pool.getTable(tableName);
List<Filter> filters = new ArrayList<Filter>();
Filter filter1 = new SingleColumnValueFilter(Bytes
.toBytes(column1), null, CompareOp.EQUAL, Bytes
.toBytes(aaa));
filters.add(filter1);
Filter filter2 = new SingleColumnValueFilter(Bytes
.toBytes(column2), null, CompareOp.EQUAL, Bytes
.toBytes(bbb));
filters.add(filter2);
Filter filter3 = new SingleColumnValueFilter(Bytes
.toBytes(column3), null, CompareOp.EQUAL, Bytes
.toBytes(ccc));
filters.add(filter3);
FilterList filterList1 = new FilterList(filters);
Scan scan = new Scan();
scan.setFilter(filterList1);
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
System.out.println(获获得rowkey: + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println(列: + new String(keyValue.getFamily())
+ ====值: + new String(keyValue.getValue()));
}
}
rs.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}Hbase数据获取示例:
/
Need Packages:
commons-codec-1.4.jar
commons-logging-1.1.1.jar
hadoop-0.20.2-core.jar
hbase-0.90.2.jar
log4j-1.2.16.jar
zookeeper-3.3.2.jar
/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseSelecter
{
public static Configuration configuration = null;
static
{
configuration = HBaseConfiguration.create();
//configuration.set(hbase.master, 192.168.0.201:60000);
configuration.set(hbase.zookeeper.quorum, idc01-hd-nd-03,idc01-hd-nd-04,idc01-hd-nd-05);
//configuration.set(hbase.zookeeper.property.clientPort, 2181);
}
public static void RowKey(String tablename, String rowKey) throws IOException
{
HTable table = new HTable(configuration, tablename);
Get g = new Get(rowKey.getBytes());
Result rs = table.get(g);
for (KeyValue kv : rs.raw())
{
System.out.println(-------------------- + new String(kv.getRow()) + ----------------------------);
System.out.println(Column Family: + new String(kv.getFamily()));
System.out.println(Column : + new String(kv.getQualifier()));
System.out.println(value : + new String(kv.getValue()));
}
}
public static void RowKeyFamily(String tablename, String rowKey, String family) throws IOException
{
HTable table = new HTable(configuration, tablename);
Get g = new Get(rowKey.getBytes());
g.addFamily(Bytes.toBytes(family));
Result rs = table.get(g);
for (KeyValue kv : rs.raw())
{
System.out.println(-------------------- + new String(kv.getRow()) + ----------------------------);
System.out.println(Column Family: + new String(kv.getFamily()));
System.out.println(Column : + new String(kv.getQualifier()));
System.out.println(value : + new String(kv.getValue()));
}
}
public static void RowKeyFamilyColumn(String tablename, String rowKey, String family, String column)
throws IOException
{
HTable table = new HTable(configuration, tablename);
Get g = new Get(rowKey.getBytes());
g.addColumn(family.getBytes(), column.getBytes());
Result rs = table.get(g);
for (KeyValue kv : rs.raw())
{
System.out.println(-------------------- + new String(kv.getRow()) + ----------------------------);
System.out.println(Column Family: + new String(kv.getFamily()));
System.out.println(Column : + new String(kv.getQualifier()));
System.out.println(value : + new String(kv.getValue()));
}
}
public static void Filter(String tablename, List<String> arr) throws IOException
{
HTable table = new HTable(configuration, tablename);
Scan scan = new Scan();// 实例化一个遍历器
FilterList filterList = new FilterList(); // 过滤器List
for (String v : arr)
{ // 下标0为列簇,1为列名,3为前提
String[] wheres = v.split(,);
filterList.addFilter(new SingleColumnValueFilter(// 过滤器
wheres[0].getBytes(), wheres[1].getBytes(),
CompareOp.EQUAL,// 各个前提之间是 and 的关系
wheres[2].getBytes()));
}
scan.setFilter(filterList);
ResultScanner ResultScannerFilterList = table.getScanner(scan);
for (Result rs = ResultScannerFilterList.next(); rs != null; rs = ResultScannerFilterList.next())
{
for (KeyValue kv : rs.list())
{
System.out.println(-------------------- + new String(kv.getRow()) + ----------------------------);
System.out.println(Column Family: + new String(kv.getFamily()));
System.out.println(Column : + new String(kv.getQualifier()));
System.out.println(value : + new String(kv.getValue()));
}
}
}
public static void main(String[] args) throws Exception
{
if(args.length < 2){
System.out.println(Usage: HbaseSelecter table key);
System.exit(-1);
}
System.out.println(Table: + args[0] + , key: + args[1]);
RowKey(args[0], args[1]);
/
System.out.println(------------------------行键 查询----------------------------------);
RowKey(b2c, yihaodian1002865);
RowKey(b2c, yihaodian1003396);
System.out.println(------------------------行键+列簇 查询----------------------------------);
RowKeyFamily(riapguh, 用户A, user);
RowKeyFamily(riapguh, 用户B, user);
System.out.println(------------------------行键+列簇+列名 查询----------------------------------);
RowKeyFamilyColumn(riapguh, 用户A, user, user_code);
RowKeyFamilyColumn(riapguh, 用户B, user, user_code);
System.out.println(------------------------前提 查询----------------------------------);
List<String> arr = new ArrayList<String>();
arr.add(dpt,dpt_code,d_001);
arr.add(user,user_code,u_0001);
Filter(riapguh, arr);
/
}
}Hbase 导出特定列 示例(小量数据):
/
Need Packages:
commons-codec-1.4.jar
commons-logging-1.1.1.jar
hadoop-0.20.2-core.jar
hbase-0.90.2.jar
log4j-1.2.16.jar
zookeeper-3.3.2.jar
Example: javac -classpath ./:/data/chenzhenjing/code/panama/lib/hbase-0.90.2.jar:/data/chenzhenjing/code/panama/lib/hadoop-core-0.20-append-for-hbase.jar:/data/chenzhenjing/code/panama/lib/commons-logging-1.0.4.jar:/data/chenzhenjing/code/panama/lib/commons-lang-2.4.jar:/data/chenzhenjing/code/panama/lib/commons-io-1.2.jar:/data/chenzhenjing/code/panama/lib/zookeeper-3.3.2.jar:/data/chenzhenjing/code/panama/lib/log4j-1.2.15.jar:/data/chenzhenjing/code/panama/lib/commons-codec-1.3.jar DiffHbase.java
/
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.StringReader;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
class ColumnUtils {
public static byte[] getFamily(String column){
return getBytes(column, 0);
}
public static byte[] getQualifier(String column){
return getBytes(column, 1);
}
private static byte[] getBytes(String column , int offset){
String[] split = column.split(:);
return Bytes.toBytes(offset > split.length -1 ? split[0] :split[offset]);
}
}
public class DiffHbase
{
public static Configuration configuration = null;
static
{
configuration = HBaseConfiguration.create();
configuration.set(hbase.zookeeper.quorum, idc01-hd-ds-01,idc01-hd-ds-02,idc01-hd-ds-03);
}
public static void RowKey(String tablename, String rowKey) throws IOException
{
HTable table = new HTable(configuration, tablename);
Get g = new Get(rowKey.getBytes());
Result rs = table.get(g);
for (KeyValue kv : rs.raw())
{
System.out.println(-------------------- + new String(kv.getRow()) + ----------------------------);
System.out.println(Column Family: + new String(kv.getFamily()));
System.out.println(Column : + new String(kv.getQualifier()) + t);
System.out.println(value : + new String(kv.getValue()));
}
}
public static void RowKeyFamily(String tablename, String rowKey, String family) throws IOException
{
HTable table = new HTable(configuration, tablename);
Get g = new Get(rowKey.getBytes());
g.addFamily(Bytes.toBytes(family));
Result rs = table.get(g);
for (KeyValue kv : rs.raw())
{
System.out.println(-------------------- + new String(kv.getRow()) + ----------------------------);
System.out.println(Column Family: + new String(kv.getFamily()));
System.out.println(Column : + new String(kv.getQualifier()) + t);
System.out.println(value : + new String(kv.getValue()));
}
}
public static void RowKeyFamilyColumn(String tablename, String rowKey, String family, String column)
throws IOException
{
HTable table = new HTable(configuration, tablename);
Get g = new Get(rowKey.getBytes());
g.addColumn(family.getBytes(), column.getBytes());
Result rs = table.get(g);
for (KeyValue kv : rs.raw())
{
System.out.println(-------------------- + new String(kv.getRow()) + ----------------------------);
System.out.println(Column Family: + new String(kv.getFamily()));
System.out.println(Column : + new String(kv.getQualifier()) + t);
System.out.println(value : + new String(kv.getValue()));
}
}
private static final String USAGE = Usage: DiffHbase [-o outfile] tablename infile filterColumns...;
/
Prints the usage message and exists the program.
@param message The message to print first.
/
private static void printUsage(String message) {
System.err.println(message);
System.err.println(USAGE);
throw new RuntimeException(USAGE);
}
private static void PrintId(String id, Result rs){
String value = Bytes.toString( rs.getValue(ColumnUtils.getFamily(info:url), ColumnUtils.getQualifier(info:url)));
if(value == null){
System.out.println( id + \tNULL);
}else{
System.out.println( id + \t + value);
}
}
private static void WriteId(String id, Result rs, FileOutputStream os){
String value = Bytes.toString( rs.getValue(ColumnUtils.getFamily(info:url), ColumnUtils.getQualifier(info:url)));
try{
if(value == null){
os.write( (id + \tNULL\n).getBytes());
}else{
os.write( (id + \t + value + \n).getBytes());
}
}
catch (IOException e) {
e.printStackTrace();
}
}
private static void PrintRow(String id, Result rs){
System.out.println(-------------------- + id + ----------------------------);
for (KeyValue kv : rs.raw())
{
System.out.println(new String(kv.getFamily()) + : + new String(kv.getQualifier()) + : + new String(kv.getValue()));
}
}
public static void main(String[] args) throws Exception
{
if (args.length < 3) {
printUsage(Too few arguments);
}
String outfile = null;
String tablename = args[0];
String dictfile = args[1];
int skilLen = 2;
if( args[0].equals(-o)){
outfile = args[1];
tablename = args[2];
dictfile = args[3];
skilLen = 4;
}
HTable table = new HTable(configuration, tablename);
String[] filterColumns = new String[args.length - skilLen];
System.arraycopy(args, skilLen, filterColumns, 0, args.length - skilLen);
System.out.println(filterColumns: );
for(int i=0; i<filterColumns.length; ++i){
System.out.println(\t + filterColumns[i]);
}
FileOutputStream os = null;
if(outfile != null){
os = new FileOutputStream(outfile);
}
int count = 0;
SimpleDateFormat df = new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);//设置日期格局
File srcFile = new File(dictfile);
FileInputStream in = new FileInputStream(srcFile);
InputStreamReader isr = new InputStreamReader(in);
BufferedReader br = new BufferedReader(isr);
String read = null;
while ((read = br.readLine()) != null) {
String[] split = read.trim().split(\\s); // space split
if( split.length < 1 ){
System.out.println(Error line: + read);
continue;
}
if( ++count % 1000 == 0){
System.out.println(df.format(new Date()) + : + count + rows processed. ); // new Date()为获取当前体系时候
}
// System.out.println(ROWKEY: + split[0]);
Get g = new Get(split[0].getBytes());
Result rs = table.get(g);
if( rs == null){
System.out.println(No Result for + split[0]);
continue;
}
for(int i=0; i<filterColumns.length; ++i){
String value = Bytes.toString(rs.getValue(ColumnUtils.getFamily(filterColumns[i]), ColumnUtils.getQualifier(filterColumns[i])));
if(value == null){
if( os == null){
PrintId(split[0], rs);
}else{
WriteId(split[0], rs, os);
}
// PrintRow(split[0], rs);
break;
}
}
}
br.close();
isr.close();
in.close();
}
}Hbase Mapreduce示例:全库扫描(多量数据):
package com.hbase.mapreduce;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.util.Bytes;
import com.goodhope.utils.ColumnUtils;
public class ExportHbase {
private static final String INFOCATEGORY = info:storecategory;
private static final String USAGE = Usage: ExportHbase +
-r <numReduceTasks> -indexConf <iconfFile>\n +
-indexDir <indexDir> -webSite <amazon> [-need <true> -isVisible -startTime <long>] -table <tableName> -columns <columnName1> +
[<columnName2> ...];
/
Prints the usage message and exists the program.
@param message The message to print first.
/
private static void printUsage(String message) {
System.err.println(message);
System.err.println(USAGE);
throw new RuntimeException(USAGE);
}
/
Creates a new job.
@param conf
@param args The command line arguments.
@throws IOException When reading the configuration fails.
/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
if (args.length < 7) {
printUsage(Too few arguments);
}
int numReduceTasks = 1;
String iconfFile = null;
String indexDir = null;
String tableName = null;
String website = null;
String need = ;
String expectShopGrade = ;
String dino = 6;
String isdebug = 0;
long debugThreshold = 10000;
String debugThresholdStr = Long.toString(debugThreshold);
String queue = offline;
long endTime = Long.MAX_VALUE;
int maxversions = 1;
long startTime = System.currentTimeMillis() - 282460601000l;
long distartTime = System.currentTimeMillis() - 302460601000l;
long diusedTime = System.currentTimeMillis() - 302460601000l;
String startTimeStr = Long.toString(startTime);
String diusedTimeStr = Long.toString(diusedTime);
String quorum = null;
String isVisible = ;
List<String> columns = new ArrayList<String>() ;
boolean bFilter = false;
// parse args
for (int i = 0; i < args.length - 1; i++) {
if (-r.equals(args[i])) {
numReduceTasks = Integer.parseInt(args[++i]);
} else if (-indexConf.equals(args[i])) {
iconfFile = args[++i];
} else if (-indexDir.equals(args[i])) {
indexDir = args[++i];
} else if (-table.equals(args[i])) {
tableName = args[++i];
} else if (-webSite.equals(args[i])) {
website = args[++i];
} else if (-startTime.equals(args[i])) {
startTimeStr = args[++i];
startTime = Long.parseLong(startTimeStr);
} else if (-need.equals(args[i])) {
need = args[++i];
} else if (-isVisible.equals(args[i])) {
isVisible = true;
} else if (-shopgrade.equals(args[i])) {
expectShopGrade = args[++i];
} else if (-queue.equals(args[i])) {
queue = args[++i];
} else if (-dino.equals(args[i])) {
dino = args[++i];
} else if (-maxversions.equals(args[i])) {
maxversions = Integer.parseInt(args[++i]);
} else if (-distartTime.equals(args[i])) {
distartTime = Long.parseLong(args[++i]);
} else if (-diendTime.equals(args[i])) {
endTime = Long.parseLong(args[++i]);
} else if (-diusedTime.equals(args[i])) {
diusedTimeStr = args[++i];
diusedTime = Long.parseLong(diusedTimeStr);
} else if (-quorum.equals(args[i])) {
quorum = args[++i];
} else if (-filter.equals(args[i])) {
bFilter = true;
} else if (-columns.equals(args[i])) {
columns.add(args[++i]);
while (i + 1 < args.length && !args[i + 1].startsWith(-)) {
String columnname = args[++i];
columns.add(columnname);
System.out.println(args column----: + columnname);
}
} else if (-debugThreshold.equals(args[i])) {
isdebug = 1;
debugThresholdStr = args[++i];
debugThreshold = Long.parseLong( debugThresholdStr );
}
else {
printUsage(Unsupported option + args[i]);
}
}
if (distartTime > endTime) {
printUsage(distartTime must <= diendTime);
}
if (indexDir == null || tableName == null || columns.isEmpty()) {
printUsage(Index directory, table name and at least one column must +
be specified);
}
if (iconfFile != null) {
// set index configuration content a file
String content = readContent(iconfFile);
conf.set(hbase.index.conf, content);
conf.set(hbase.website.name, website);
conf.set(hbase.need.productDB, need);
conf.set(hbase.expect.shopgrade, expectShopGrade);
conf.set(hbase.di.no, dino);
conf.set(hbase.expect.item.visible, isVisible);
conf.set(hbase.index.startTime, startTimeStr);
conf.set(hbase.index.diusedTime, diusedTimeStr);
conf.set(hbase.index.debugThreshold, debugThresholdStr);
conf.set(hbase.index.debug, isdebug);
if (quorum != null) {
conf.set(hbase.zookeeper.quorum, quorum);
}
String temp = ;
for (String column : columns) {
temp = temp + column + |;
}
temp = temp.substring(0, temp.length() - 1);
conf.set(hbase.index.column, temp);
System.out.println(hbase.index.column: + temp);
}
Job job = new Job(conf, export data table + tableName);
((JobConf) job.getConfiguration()).setQueueName(queue);
// number of indexes to partition into
job.setNumReduceTasks(numReduceTasks);
Scan scan = new Scan();
scan.setCacheBlocks(false);
// limit scan range
scan.setTimeRange(distartTime, endTime);
// scan.setMaxVersions(maxversions);
scan.setMaxVersions(1);
/ limit scan columns /
for (String column : columns) {
scan.addColumn(ColumnUtils.getFamily(column), ColumnUtils.getQualifier(column));
scan.addFamily(ColumnUtils.getFamily(column));
}
// set filter
if( bFilter ){
System.out.println(only export guangtaobao data. );
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes(info),
Bytes.toBytes(producttype),
CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes(guangtaobao)) );
filter.setFilterIfMissing(true);
scan.setFilter(filter);
}
TableMapReduceUtil.initTableMapperJob(tableName, scan, ExportHbaseMapper.class,
Text.class, Text.class, job);
// job.setReducerClass(ExportHbaseReducer.class);
FileOutputFormat.setOutputPath(job, new Path(indexDir));
return job;
}
/
Reads xml file of indexing configurations. The xml format is similar to
hbase-default.xml and hadoop-default.xml. For an example configuration,
see the <code>createIndexConfContent</code> method in TestTableIndex.
@param fileName The file to read.
@return XML configuration read file.
@throws IOException When the XML is broken.
/
private static String readContent(String fileName) throws IOException {
File file = new File(fileName);
int length = (int) file.length();
if (length == 0) {
printUsage(Index configuration file + fileName + does not exist);
}
int bytesRead = 0;
byte[] bytes = new byte[length];
FileInputStream fis = new FileInputStream(file);
try {
// read entire file into content
while (bytesRead < length) {
int read = fis.read(bytes, bytesRead, length - bytesRead);
if (read > 0) {
bytesRead += read;
} else {
break;
}
}
} finally {
fis.close();
}
return new String(bytes, 0, bytesRead, HConstants.UTF8_ENCODING);
}
/
The main entry point.
@param args The command line arguments.
@throws Exception When running the job fails.
/
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String[] otherArgs =
new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = createSubmittableJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
//////////////////////////////////////////////////////////
package com.hbase.mapreduce;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.lang.String;
import java.lang.StringBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.KeyValue;
import com.goodhope.utils.ColumnUtils;
/
Pass the given key and record as-is to the reduce phase.
/
@SuppressWarnings(deprecation)
public class ExportHbaseMapper extends TableMapper<Text,Text> implements Configurable {
private static final Text keyTEXT = new Text();
private static final Text SENDTEXT = new Text();
private Configuration conf = null;
private long startTime = 0;
List<String> columnMap = null;
private long rCount = 0;
private long errCount = 0;
private int debug = 0;
private long thresCount = 10000;
public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
rCount++;
String itemid = Bytes.toString(key.get());
if (itemid.contains(&)) {
context.getCounter(Error, rowkey contains \&\).increment(1);
return;
}
StringBuffer outstr = new StringBuffer();
for (String col : columnMap) {
String tmp = Bytes.toString(value.getValue(ColumnUtils.getFamily(col), ColumnUtils.getQualifier(col)));
if (tmp == null){
context.getCounter(Error, col+ No value in hbase).increment(1);
errCount++;
if( debug > 0 && (errCount % thresCount == 0)){
System.err.println( itemid + : doesnt has + col + data!);
}
outstr.append(NULL + \t);
}else{
if( tmp.contains(guangtaobao) ){
outstr.append(1 + \t);
}else{
outstr.append(tmp.trim() + \t);
}
}
}
if ( ! outstr.toString().isEmpty() ) {
SENDTEXT.set( outstr.toString() );
keyTEXT.set(itemid);
context.write(keyTEXT, SENDTEXT);
if( debug > 0 && (rCount % thresCount10000 == 0)){
System.out.println( SENDTEXT.toString() + keyTEXT.toString() );
}
}
else
{
context.getCounter(Error, No Colume output).increment(1);
return;
}
}
/
Returns the current configuration.
@return The current configuration.
@see org.apache.hadoop.conf.Configurable#getConf()
/
@Override
public Configuration getConf() {
return conf;
}
/
Sets the configuration. This is used to set up the index configuration.
@param configuration
The configuration to set.
@see org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)
/
@Override
public void setConf(Configuration configuration) {
this.conf = configuration;
startTime = Long.parseLong(conf.get(hbase.index.startTime));
thresCount = Long.parseLong(conf.get(hbase.index.debugThreshold));
debug = Integer.parseInt(conf.get(hbase.index.debug));
String[] columns = conf.get(hbase.index.column).split(\\|);
columnMap = new ArrayList<String>();
for (String column : columns) {
System.out.println(Output column: + column);
columnMap.add(column);
}
}
}
//////////////////////////////////////////////////////////
package com.hbase.utils;
import org.apache.hadoop.hbase.util.Bytes;
public class ColumnUtils {
public static byte[] getFamily(String column){
return getBytes(column, 0);
}
public static byte[] getQualifier(String column){
return getBytes(column, 1);
}
private static byte[] getBytes(String column , int offset){
String[] split = column.split(:);
return Bytes.toBytes(offset > split.length -1 ? split[0] :split[offset]);
}
}