场景大概是:需要记录大量用户的操作Action,需要查询用户最新的行为
采用数据库设计有类似的SQL
SELECT id, name, stamp FROM actions WHERE userid = 1 ORDER BY stamp DESC LIMIT 10 OFFSET 20; NSERT INTO actions (id, userid, name, stamp) VALUES (newid(), 1, 'Joe User', epoch());
如果采用HBASE设计,ROW设计为<userid><reverse_order_stamp><actionid>
之所以采用row-per-action而不是row-per-user,主要考虑采用row自身的索引进行查询
public static byte [] makeActionRow(int userid, long stamp, int actionid)
throws Exception {
byte [] useridBytes = Bytes.toBytes(userid);
byte [] stampBytes = Bytes.toBytes(stamp);
byte [] actionidBytes = Bytes.toBytes(actionid);
return Bytes.add(useridBytes, stampBytes, actionidBytes);
}
public static Action readActionRow(byte [] row)
throws Exception {
// Bytes.toInt(byte [] buf, int offset, int length)
int userid = Bytes.toInt(row,0,4);
long stamp = Long.MAX_VALUE - Bytes.toLong(row,4,8);
int actionid = Bytes.toInt(row,12,4);
return new Action(userid,stamp,actionid);
}
插入操作
public static void putUserAction(Action action) throws Exception {
// Get the fields from the Action object
int userid = action.getUserID();
long stamp = Long.MAX_VALUE - action.getStamp();
int actionid = action.getID();
String name = action.getName();
// Build the composite row, column, and value
byte [] row = makeActionRow(userid,stamp,actionid);
byte [] column = Bytes.toBytes("content:name");
byte [] value = Bytes.toBytes(name);
// Insert to HBase
HTable ht = new HTable("useractions");
BatchUpdate bu = new BatchUpdate(row);
bu.put(column,value)
ht.commit(bu);
}
进行分页查询
public static List<Action> getUserActions(int userid, int offset, int limit)
throws Exception {
// Initialize counter and List to return
int count = 0;
List<Action> actions = new ArrayList<Action>(limit);
// Initialize startRow, stopRow, and columns to match
byte [] startRow = makeActionRow(userid,0,0);
byte [] stopRow = makeActionRow(userid,Long.MAX_VALUE,Integer.MAX_VALUE);
byte [][] columns = {Bytes.toBytes("content:name")};
// Open Scanner
HTable ht = new HTable("useractions");
Scanner s = ht.getScanner(columns,startRow,stopRow);
RowResult res = null;
// Iterate over Scanner
while((res = s.next()) != null) {
// Check if past offset
if(++count <= offset) continue;
// Get data from RowResult
byte [] row = res.getRow();
byte [] value = res.get(columns[0]).getValue();
// Build Action
Action action = readActionRow(row);
String name = Bytes.toString(value);
action.setName(name);
actions.add(action);
// Check limit
if(count == offset + limit) break;
}
// Cleanup and return
s.close();
return actions;
}
![(please configure the [header_logo] section in trac.ini)](http://www1.pconline.com.cn/hr/2009/global/images/logo.gif)