Skip to content

Commit

Permalink
Merge pull request #28 from azaz123/master
Browse files Browse the repository at this point in the history
增加文本协议flush_all命令 #12
  • Loading branch information
yanjunli authored Mar 28, 2017
2 parents 11f2999 + 2474898 commit d3b8f1b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 6 deletions.
56 changes: 50 additions & 6 deletions src/main/java/io/mycat/jcache/net/conn/handler/AsciiIOHanlder.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import io.mycat.jcache.enums.DELTA_RESULT_TYPE;
import io.mycat.jcache.enums.Store_item_type;
import io.mycat.jcache.enums.conn.CONN_STATES;
import io.mycat.jcache.enums.protocol.binary.ProtocolBinaryCommand;
import io.mycat.jcache.enums.protocol.binary.ProtocolResponseStatus;
import io.mycat.jcache.net.JcacheGlobalConfig;
import io.mycat.jcache.net.command.Command;
import io.mycat.jcache.net.command.CommandType;
import io.mycat.jcache.net.conn.Connection;
import io.mycat.jcache.setting.Settings;
import io.mycat.jcache.util.BytesUtil;
import io.mycat.jcache.util.ItemUtil;
import io.mycat.jcache.util.UnSafeUtil;
Expand Down Expand Up @@ -54,13 +56,17 @@ public class AsciiIOHanlder implements IOHandler {
private static ByteBuffer INVALID_EXP = ByteBuffer.wrap("CLIENT_ERROR invalid exptime argument\r\n".getBytes());

private static ByteBuffer TOUCHED = ByteBuffer.wrap("TOUCHED\r\n".getBytes());

private static ByteBuffer R_OK = ByteBuffer.wrap("OK\r\n".getBytes());

private static ByteBuffer FLUSH_FORBIDDEN = ByteBuffer.wrap("CLIENT_ERROR flush_all not allowed.\r\n".getBytes());

/**
* 文本协议处理
* TODO 编码/解码部分接口化,公用化 处理
* 鏂囨湰鍗忚澶勭悊
* TODO 缂栫爜/瑙g爜閮ㄥ垎鎺ュ彛鍖栵紝鍏敤鍖� 澶勭悊
* @param conn
* @param buffer
* @return boolean 是否需要继续读取命令, true 继续读取命令,不清空当前缓冲区,false 设置lastMessagePos 执行下一个状态
* @return boolean 鏄惁闇�瑕佺户缁鍙栧懡浠�, true 缁х画璇诲彇鍛戒护,涓嶆竻绌哄綋鍓嶇紦鍐插尯,false 璁剧疆lastMessagePos 鎵ц涓嬩竴涓姸鎬�
* @throws IOException
*/
@Override
Expand Down Expand Up @@ -130,8 +136,8 @@ public void doReadValue(Connection conn,String value) throws IOException{
}

/**
* 命令的解析处理
* TODO 增加 链式处理设计框架
* 鍛戒护鐨勮В鏋愬鐞�
* TODO 澧炲姞 閾惧紡澶勭悊璁捐妗嗘灦
* @param conn
* @param readedLine
*/
Expand Down Expand Up @@ -162,6 +168,8 @@ private void process_command(Connection conn,String readedLine)throws IOExceptio
process_arithmetic_command(conn,params,false);
}else if((len==3||len==4)&&"touch".equals(params[0])){
process_touch_command(conn,params);
}else if(len>=1&&len<=3&&"flush_all".equals(params[0])){
process_flush_command(conn,params);
}else if(len == 1&&"version".equals(params[0])){
out_string(conn,ByteBuffer.wrap(("VERSION "+JcacheGlobalConfig.version).getBytes()));
conn.setWrite_and_go(CONN_STATES.conn_write);
Expand Down Expand Up @@ -258,6 +266,41 @@ private void process_arithmetic_command(Connection conn,String[] params,boolean
conn.setWrite_and_go(CONN_STATES.conn_write);
}

private void process_flush_command(Connection conn,String[] params){
long new_oldest = 0;

if(!Settings.flushEnabled){
out_string(conn,FLUSH_FORBIDDEN);
return;
}

long exptime = 0;
if(params.length >= 2)
{
exptime = Long.parseLong(params[1]);
}


exptime = exptime * 1000L + System.currentTimeMillis();

if(exptime > 0){
new_oldest = ItemUtil.realtime(exptime);
}else{
new_oldest = System.currentTimeMillis();
}

if(Settings.useCas){
Settings.oldestLive = new_oldest - 1000;
if(Settings.oldestLive < System.currentTimeMillis()){
Settings.oldestCas = ItemUtil.get_cas_id();
}
}else{
Settings.oldestLive = new_oldest;
}
out_string(conn, R_OK);
return;
}

private void process_delete_command(Connection conn,String[] params){
String key;
int nkey;
Expand Down Expand Up @@ -316,7 +359,7 @@ private void process_get_command(Connection conn,String[] params,boolean return_
.append(new String(ItemUtil.getValue(it))).append("\r\n");
addWriteQueue(conn,ByteBuffer.wrap(result.toString().getBytes()));
JcacheContext.getItemsAccessManager().item_remove(it); //refcount --;
JcacheContext.getItemsAccessManager().item_update(it); // 更新 最近访问时间
JcacheContext.getItemsAccessManager().item_update(it); // 鏇存柊 鏈�杩戣闂椂闂�
}
}

Expand Down Expand Up @@ -387,6 +430,7 @@ private void process_update_command(Connection conn,String[] params,int comm,boo
conn.setWrite_and_go(CONN_STATES.conn_nread);
}


private void addWriteQueue(Connection conn,ByteBuffer badformat){
ByteBuffer formet = badformat.slice();
formet.position(formet.limit());
Expand Down
Binary file not shown.

0 comments on commit d3b8f1b

Please sign in to comment.