Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加文本协议flush_all命令 #28

Merged
merged 1 commit into from
Mar 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions src/main/java/io/mycat/jcache/net/conn/handler/AsciiIOHanlder.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import io.mycat.jcache.enums.DELTA_RESULT_TYPE;
import io.mycat.jcache.enums.Store_item_type;
import io.mycat.jcache.enums.conn.CONN_STATES;
import io.mycat.jcache.enums.protocol.binary.ProtocolBinaryCommand;
import io.mycat.jcache.enums.protocol.binary.ProtocolResponseStatus;
import io.mycat.jcache.net.JcacheGlobalConfig;
import io.mycat.jcache.net.command.Command;
import io.mycat.jcache.net.command.CommandType;
import io.mycat.jcache.net.conn.Connection;
import io.mycat.jcache.setting.Settings;
import io.mycat.jcache.util.BytesUtil;
import io.mycat.jcache.util.ItemUtil;
import io.mycat.jcache.util.UnSafeUtil;
Expand Down Expand Up @@ -54,13 +56,17 @@ public class AsciiIOHanlder implements IOHandler {
private static ByteBuffer INVALID_EXP = ByteBuffer.wrap("CLIENT_ERROR invalid exptime argument\r\n".getBytes());

private static ByteBuffer TOUCHED = ByteBuffer.wrap("TOUCHED\r\n".getBytes());

private static ByteBuffer R_OK = ByteBuffer.wrap("OK\r\n".getBytes());

private static ByteBuffer FLUSH_FORBIDDEN = ByteBuffer.wrap("CLIENT_ERROR flush_all not allowed.\r\n".getBytes());

/**
* 文本协议处理
* TODO 编码/解码部分接口化,公用化 处理
* 鏂囨湰鍗忚澶勭悊
* TODO 缂栫爜/瑙g爜閮ㄥ垎鎺ュ彛鍖栵紝鍏敤鍖� 澶勭悊
* @param conn
* @param buffer
* @return boolean 是否需要继续读取命令, true 继续读取命令,不清空当前缓冲区,false 设置lastMessagePos 执行下一个状态
* @return boolean 鏄惁闇�瑕佺户缁鍙栧懡浠�, true 缁х画璇诲彇鍛戒护,涓嶆竻绌哄綋鍓嶇紦鍐插尯,false 璁剧疆lastMessagePos 鎵ц涓嬩竴涓姸鎬�
* @throws IOException
*/
@Override
Expand Down Expand Up @@ -130,8 +136,8 @@ public void doReadValue(Connection conn,String value) throws IOException{
}

/**
* 命令的解析处理
* TODO 增加 链式处理设计框架
* 鍛戒护鐨勮В鏋愬鐞�
* TODO 澧炲姞 閾惧紡澶勭悊璁捐妗嗘灦
* @param conn
* @param readedLine
*/
Expand Down Expand Up @@ -162,6 +168,8 @@ private void process_command(Connection conn,String readedLine)throws IOExceptio
process_arithmetic_command(conn,params,false);
}else if((len==3||len==4)&&"touch".equals(params[0])){
process_touch_command(conn,params);
}else if(len>=1&&len<=3&&"flush_all".equals(params[0])){
process_flush_command(conn,params);
}else if(len == 1&&"version".equals(params[0])){
out_string(conn,ByteBuffer.wrap(("VERSION "+JcacheGlobalConfig.version).getBytes()));
conn.setWrite_and_go(CONN_STATES.conn_write);
Expand Down Expand Up @@ -258,6 +266,41 @@ private void process_arithmetic_command(Connection conn,String[] params,boolean
conn.setWrite_and_go(CONN_STATES.conn_write);
}

private void process_flush_command(Connection conn,String[] params){
long new_oldest = 0;

if(!Settings.flushEnabled){
out_string(conn,FLUSH_FORBIDDEN);
return;
}

long exptime = 0;
if(params.length >= 2)
{
exptime = Long.parseLong(params[1]);
}


exptime = exptime * 1000L + System.currentTimeMillis();

if(exptime > 0){
new_oldest = ItemUtil.realtime(exptime);
}else{
new_oldest = System.currentTimeMillis();
}

if(Settings.useCas){
Settings.oldestLive = new_oldest - 1000;
if(Settings.oldestLive < System.currentTimeMillis()){
Settings.oldestCas = ItemUtil.get_cas_id();
}
}else{
Settings.oldestLive = new_oldest;
}
out_string(conn, R_OK);
return;
}

private void process_delete_command(Connection conn,String[] params){
String key;
int nkey;
Expand Down Expand Up @@ -316,7 +359,7 @@ private void process_get_command(Connection conn,String[] params,boolean return_
.append(new String(ItemUtil.getValue(it))).append("\r\n");
addWriteQueue(conn,ByteBuffer.wrap(result.toString().getBytes()));
JcacheContext.getItemsAccessManager().item_remove(it); //refcount --;
JcacheContext.getItemsAccessManager().item_update(it); // 更新 最近访问时间
JcacheContext.getItemsAccessManager().item_update(it); // 鏇存柊 鏈�杩戣闂椂闂�
}
}

Expand Down Expand Up @@ -387,6 +430,7 @@ private void process_update_command(Connection conn,String[] params,int comm,boo
conn.setWrite_and_go(CONN_STATES.conn_nread);
}


private void addWriteQueue(Connection conn,ByteBuffer badformat){
ByteBuffer formet = badformat.slice();
formet.position(formet.limit());
Expand Down
Binary file not shown.