diff --git a/doc/.project b/doc/.project new file mode 100644 index 0000000..f7597a8 --- /dev/null +++ b/doc/.project @@ -0,0 +1,11 @@ + + + doc + + + + + + + + diff --git "a/doc/articles/mycat2.0-\345\220\257\345\212\250\346\265\201\347\250\213.md" "b/doc/articles/mycat2.0-\345\220\257\345\212\250\346\265\201\347\250\213.md" new file mode 100644 index 0000000..641f961 --- /dev/null +++ "b/doc/articles/mycat2.0-\345\220\257\345\212\250\346\265\201\347\250\213.md" @@ -0,0 +1,351 @@ +1. 程序的入口是io.mycat.mycat2.MycatCore. +在main 方法中 首选取得ProxyRuntime的实例,该类是一个单例模式. +2. + +``` +runtime.setConfig(new MycatConfig()); +``` +该方法为runtime设置了一个MycatConfig ,MycatConfig 是负责读取配置文件的. + +3. +``` +ConfigLoader.INSTANCE.loadCore(); +``` + +调用ConfigLoader 进行配置文件的加载. + + 3.1. 加载mycat.yml,heartbeat.yml,cluster.yml,balancer.yml,user.yml 的配置文件.代码如下: + +``` +public void loadCore() throws IOException { + loadConfig(ConfigEnum.PROXY, GlobalBean.INIT_VERSION); + loadConfig(ConfigEnum.HEARTBEAT, GlobalBean.INIT_VERSION); + loadConfig(ConfigEnum.CLUSTER, GlobalBean.INIT_VERSION); + loadConfig(ConfigEnum.BALANCER, GlobalBean.INIT_VERSION); + loadConfig(ConfigEnum.USER, GlobalBean.INIT_VERSION); +} +``` + 3.2 进行配置文件的加载,调用了YamlUtil进行加载,并将配置信息赋值给MycatConfig,关键代码如下 : + +``` + conf.putConfig(configEnum, (Configurable) YamlUtil.load(fileName, configEnum.getClazz()), version); +``` + 3.3 在io.mycat.util.YamlUtil.load(String, Class) 中,处理也很简单,通过Yaml进行文件的加载,然后通过反射机制将属性进行赋值.代码如下: + +``` +/** + * 从指定的文件中加载配置 + * @param fileName 需要加载的文件名 + * @param clazz 加载后需要转换成的类对象 + * @return + * @throws FileNotFoundException + */ +public static T load(String fileName, Class clazz) throws FileNotFoundException { + InputStreamReader fis = null; + try { + URL url = YamlUtil.class.getClassLoader().getResource(fileName); + if (url != null) { + Yaml yaml = new Yaml(); + fis = new InputStreamReader(new FileInputStream(url.getFile()), StandardCharsets.UTF_8); + T obj = yaml.loadAs(fis, clazz); + return obj; + } + return null; + } finally { + if (fis != null) { + try { + fis.close(); + } catch (IOException ignored) { + } + } + } +} +``` +4. 进行通过命令行传入参数的解析.如:可以传入 + - -mycat.proxy.port 8067 + - -mycat.cluster.enable true + - -mycat.cluster.port 9067 + - -mycat.cluster.myNodeId leader-2 + +参数等 +支持的参数可在 io.mycat.mycat2.beans.ArgsBean 中进行查看. + + 获得参数后通过遍历进行赋值操作即可,代码如下: + +``` +private static void solveArgs(String[] args) { +int lenght = args.length; + +MycatConfig conf = ProxyRuntime.INSTANCE.getConfig(); +ProxyConfig proxyConfig = conf.getConfig(ConfigEnum.PROXY); +ClusterConfig clusterConfig = conf.getConfig(ConfigEnum.CLUSTER); +BalancerConfig balancerConfig= conf.getConfig(ConfigEnum.BALANCER); + +for (int i = 0; i < lenght; i++) { + switch(args[i]) { + case ArgsBean.PROXY_PORT: + proxyConfig.getProxy().setPort(Integer.parseInt(args[++i])); + break; + case ArgsBean.CLUSTER_ENABLE: + clusterConfig.getCluster().setEnable(Boolean.parseBoolean(args[++i])); + break; + case ArgsBean.CLUSTER_PORT: + clusterConfig.getCluster().setPort(Integer.parseInt(args[++i])); + break; + case ArgsBean.CLUSTER_MY_NODE_ID: + clusterConfig.getCluster().setMyNodeId(args[++i]); + break; + case ArgsBean.BALANCER_ENABLE: + balancerConfig.getBalancer().setEnable(Boolean.parseBoolean(args[++i])); + break; + case ArgsBean.BALANCER_PORT: + balancerConfig.getBalancer().setPort(Integer.parseInt(args[++i])); + break; + case ArgsBean.BALANCER_STRATEGY: + BalancerBean.BalancerStrategyEnum strategy = BalancerBean.BalancerStrategyEnum.getEnum(args[++i]); + if (strategy == null) { + throw new IllegalArgumentException("no such balancer strategy"); + } + balancerConfig.getBalancer().setStrategy(strategy); + break; + default: + break; + } +} +} +``` + + + +5. 设置NioReactorThreads,线程数目按照cpu的数目而定. + +``` +int cpus = Runtime.getRuntime().availableProcessors(); +runtime.setNioReactorThreads(cpus); +runtime.setReactorThreads(new MycatReactorThread[cpus]); +``` +6.设置MycatSessionManager + +7. 然后调用io.mycat.proxy.ProxyRuntime 的init方法,进行资源的初始化.代码如下: + +``` +public void init() { +//心跳调度独立出来,避免被其他任务影响 +heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(); +HeartbeatConfig heartbeatConfig = config.getConfig(ConfigEnum.HEARTBEAT); +timerExecutor = ExecutorUtil.create("Timer", heartbeatConfig.getHeartbeat().getTimerExecutor()); +businessExecutor = ExecutorUtil.create("BusinessExecutor",Runtime.getRuntime().availableProcessors()); +listeningExecutorService = MoreExecutors.listeningDecorator(businessExecutor); +MatchMethodGenerator.initShrinkCharTbl(); +} +``` +7.1. 建立心跳线程, 原因是避免被其他任务影响. + +7.2 建立timerExecutor,线程数目默认为2. + +7.3 建立businessExecutor,线程数目为cpu数目. + +7.4 建立listeningExecutorService, 该ExecutorService将会进行任务的提交给businessExecutor + +7.5 调用io.mycat.mycat2.sqlparser.MatchMethodGenerator#initShrinkCharTbl方法.该方法只是对0-9a-zA-Z的字符进行映射.代码如下: + + +``` +static final byte[] shrinkCharTbl = new byte[96];//为了压缩hash字符映射空间,再次进行转义 +public static void initShrinkCharTbl () { + shrinkCharTbl[0] = 1;//从 $ 开始计算 + IntStream.rangeClosed('0', '9').forEach(c -> shrinkCharTbl[c-'$'] = (byte)(c-'0'+2)); + IntStream.rangeClosed('A', 'Z').forEach(c -> shrinkCharTbl[c-'$'] = (byte)(c-'A'+12)); + IntStream.rangeClosed('a', 'z').forEach(c -> shrinkCharTbl[c-'$'] = (byte)(c-'a'+12)); + shrinkCharTbl['_'-'$'] = (byte)38; +} +``` +8. 调用ProxyStarter的start,首先 是创建了NIOAcceptor,负责对请求进行处理.然后根据ClusterConfig的配置进行相应的处理. + + +``` +ClusterConfig clusterConfig = conf.getConfig(ConfigEnum.CLUSTER); +ClusterBean clusterBean = clusterConfig.getCluster(); +if (clusterBean.isEnable()) { + // 启动集群 + startCluster(runtime, clusterBean, acceptor); +} else { + // 未配置集群,直接启动 + startProxy(true); +} +``` + +9.接下来分析未配置集群的启动方式. + +9.1 首先通过ProxyRuntime获取到ProxyConfig(该对象是mycat.yml的封装),mycat.yml的配置文件如下: + +``` +proxy: + ip: 0.0.0.0 + port: 8066 +``` +因此可以通过该类拿到启动端口和ip.因此传入NIOAcceptor进行监听. + +9.2 在io.mycat.proxy.NIOAcceptor#startServerChannel中,做了如下处理: + +9.2.1 首先检查ServerSocketChannel是否已经启动,如果已经启动,不进行后续处理. + +9.2.2 根据传入的ServerType做不同的处理,当前我们传入的值为MYCAT. 因此不进行处理. + +``` +if (serverType == ServerType.CLUSTER) { + adminSessionMan = new DefaultAdminSessionManager(); + ProxyRuntime.INSTANCE.setAdminSessionManager(adminSessionMan); + logger.info("opend cluster conmunite port on {}:{}", ip, port); +} else if (serverType == ServerType.LOAD_BALANCER){ + logger.info("opend load balance conmunite port on {}:{}", ip, port); + ProxyRuntime.INSTANCE.setProxySessionSessionManager(new ProxySessionManager()); + ProxyRuntime.INSTANCE.setLbSessionSessionManager(new LBSessionManager()); +} +``` + +9.2.3 调用io.mycat.proxy.NIOAcceptor#getServerSocketChannel,进行启动.代码如下: + + +``` +private void openServerChannel(Selector selector, String bindIp, int bindPort, ServerType serverType) + throws IOException { +final ServerSocketChannel serverChannel = ServerSocketChannel.open(); +final InetSocketAddress isa = new InetSocketAddress(bindIp, bindPort); +serverChannel.bind(isa); +serverChannel.configureBlocking(false); +serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); +serverChannel.register(selector, SelectionKey.OP_ACCEPT, serverType); +if (serverType == ServerType.CLUSTER) { + logger.info("open cluster server port on {}:{}", bindIp, bindPort); + clusterServerSocketChannel = serverChannel; +} else if (serverType == ServerType.LOAD_BALANCER) { + logger.info("open load balance server port on {}:{}", bindIp, bindPort); + loadBalanceServerSocketChannel = serverChannel; +} else { + logger.info("open proxy server port on {}:{}", bindIp, bindPort); + proxyServerSocketChannel = serverChannel; +} +} +``` + +9.4 调用io.mycat.mycat2.ProxyStarter#startReactor,进行MycatReactorThread的启动.该线程负责对请求进行处理. + +9.5 加载配置文件信息,该方法在ROOT_PATH目录下创建了prepare和archive两个文件夹,并加载了replica-index.yml,datasource.yml,schema.yml,最后调用了AnnotationProcessor#getInstance 进行初始化. + +9.5.1 创建DynamicAnnotationManager. + +9.5.2 对AnnotationProcessor.class.getClassLoader().getResource("") 的路径进行监听.当文件目录有变化时会回调io.mycat.mycat2.sqlannotations.AnnotationProcessor#listen方法.代码如下: + + +``` +public static void listen() { +try { +while (true) { + WatchKey key = watcher.take();//todo 线程复用,用 poll比较好? + boolean flag = false; + for (WatchEvent event: key.pollEvents()) { + String str = event.context().toString(); + if ("actions.yml".equals(str)|| "annotations.yml".equals(str)) { + flag=true; + break; + } + } + if (flag){ + System.out.println("动态注解更新次数" + count.incrementAndGet()); + init(); + } + boolean valid = key.reset(); + if (!valid) { + break; + } + +} +} catch (Exception e) { +e.printStackTrace(); +} +} +``` + +==可以看到,当actions.yml或者annotations.yml变化时,就会重新调用init方法,从而对DynamicAnnotationManager进行修改.== + + +9.6 分别对datasource.yml 和 schema.yml 进行加载. + +9.6.1 对MySQLRepBean进行初始化.该bean是对datasource.yml的封装.datasource.yml如下所示: + + +``` +replicas: + - name: test # 复制组 名称 必须唯一 + repType: MASTER_SLAVE # 复制类型 + switchType: SWITCH # 切换类型 + balanceType: BALANCE_ALL_READ # 读写分离类型 + mysqls: + - hostName: mysql-01 # mysql 主机名 + ip: 127.0.0.1 # ip + port: 3306 # port + user: root # 用户名 + password: root # 密码 + minCon: 1 # 最小连接 + maxCon: 10 # 最大连接 + maxRetryCount: 3 # 连接重试次数 + +``` + + +``` +conf.getMysqlRepMap().forEach((repName, repBean) -> { + repBean.initMaster(); + repBean.getMetaBeans().forEach(metaBean -> metaBean.prepareHeartBeat(repBean, repBean.getDataSourceInitStatus())); +}); +``` + +9.6.2 调用io.mycat.mycat2.beans.MySQLRepBean#initMaster, 做了如下处理: + +首先加载replica-index.yml,获取replica name 对应的index,并对MySQLRepBean中的metaBeans进行赋值,完成对mysqls 的封装. + +之后调用MySQLMetaBean的prepareHeartBeat方法. +完成 + + +9.7 ==调用io.mycat.proxy.ProxyRuntime#startHeartBeatScheduler,启动heartbeatScheduler线程.该线程会每10000 ms 调用io.mycat.proxy.ProxyRuntime#replicaHeartbeat.== 心跳的配置在heartbeat.yml中,默认配置如下: + + +``` +heartbeat: + timerExecutor: 2 + replicaHeartbeatPeriod: 10000 + replicaIdleCheckPeriod: 2000 + idleTimeout: 2000 + processorCheckPeriod: 2000 + minSwitchtimeInterval: 120000 +``` + +9.7.1 在io.mycat.proxy.ProxyRuntime#replicaHeartbeat,方法中代码如下: + + +``` +private Runnable replicaHeartbeat() { +return ()->{ +ProxyReactorThread reactor = getReactorThreads()[ThreadLocalRandom.current().nextInt(getReactorThreads().length)]; +reactor.addNIOJob(()-> config.getMysqlRepMap().values().stream().forEach(f -> f.doHeartbeat())); +}; +} +``` + +最终会调用io.mycat.mycat2.beans.heartbeat.MySQLHeartbeat#heartbeat. + + +9.8 对cluster的配置进行处理.代码如下: + + +``` +BalancerConfig balancerConfig = conf.getConfig(ConfigEnum.BALANCER); +BalancerBean balancerBean = balancerConfig.getBalancer(); +// 集群模式下才开启负载均衡服务 +if (clusterBean.isEnable() && balancerBean.isEnable()) { + runtime.getAcceptor().startServerChannel(balancerBean.getIp(), balancerBean.getPort(), ServerType.LOAD_BALANCER); +} +``` + diff --git a/source/minimal_sql_tokens.txt b/source/minimal_sql_tokens.txt new file mode 100644 index 0000000..42cce35 --- /dev/null +++ b/source/minimal_sql_tokens.txt @@ -0,0 +1,146 @@ +AS +ID +IF +ON +XA +ALL +END +FOR +NOT +OFF +SET +SQL +USE +XML +CALL +DATA +DROP +DESC +FILE +FROM +HELP +INTO +JOIN +KILL +LOAD +LOCK +LEFT +NAME +REPL +SHOW +SLOW +TIME +USER +ALTER +BEGIN +CLEAR +CACHE +EVENT +GROUP +GRANT +INDEX +LIMIT +LOCAL +MYCAT +ORDER +PROXY +QUERY +RIGHT +ROUTE +START +SUPER +TABLE +UNION +USAGE +WHERE +CATLET +COMMIT +CONFIG +CREATE +DETAIL +DELETE +EXISTS +IGNORE +INFILE +INSERT +ONLINE +RELOAD +RENAME +REVOKE +ROUTER +SELECT +SERVER +SCHEMA +SWITCH +UNLOCK +UPDATE +BACKEND +BALANCE +CHARSET +CURRENT +DB_TYPE +DELAYED +EXECUTE +EXPLAIN +OFFLINE +PREPARE +RECOVER +RELEASE +REPLACE +REPLICA +SESSION +STARTUP +TRIGGER +VERSION +DATABASE +DATANODE +DESCRIBE +ROLLBACK +SHUTDOWN +TRUNCATE +BENCHMARK +COLLATION +HEARTBEAT +PROCEDURE +PROCESSOR +REPL_NAME +ROW_COUNT +SAVEPOINT +CACHE_TIME +CONCURRENT +CONNECTION +DATASOURCE +FOUND_ROWS +REFERENCES +THREADPOOL +REPLICATION +SYSTEM_USER +ACCESS_COUNT +AUTO_REFRESH +CACHE_RESULT +COERCIBILITY +CURRENT_USER +LOW_PRIORITY +SESSION_USER +CONNECTION_ID +HIGH_PRIORITY +LAST_INSERT_ID +REPL_METABEAN_INDEX + + + + + + + + + + + + + + + + + + diff --git a/source/sql_tokens.txt b/source/sql_tokens.txt new file mode 100644 index 0000000..bac4452 --- /dev/null +++ b/source/sql_tokens.txt @@ -0,0 +1,710 @@ +ACCESSIBLE +ACCESS_COUNT +ACCOUNT +ACTION +ADD +ADDDATE +AFTER +AGAINST +AGGREGATE +ALGORITHM +ALL +ALTER +ALWAYS +ANALYSE +ANALYZE +AND +ANY +AS +ASC +ASCII +ASENSITIVE +AT +AUTHORS +AUTOCOMMIT +AUTOEXTEND_SIZE +AUTO_INCREMENT +AUTO_REFRESH +AVG_ROW_LENGTH +AVG +BACKEND +BACKUP +BALANCE +BEFORE +BEGIN +BENCHMARK +BETWEEN +BIGINT +BINARY +BINLOG +BIN_NUM +BIT_AND +BIT_OR +BIT +BIT_XOR +BLOB +BLOCK +BOOLEAN +BOOL +BOTH +BTREE +BY +BYTE +CACHE +CACHE_TIME +CACHE_RESULT +CALL +CASCADE +CASCADED +CASE +CAST +CATALOG_NAME +CATLET +CHAIN +CHANGE +CHANGED +CHANNEL +CHARSET +CHARACTER +CHAR +CHECKSUM +CHECK +CIPHER +CLASS_ORIGIN +CLEAR +CLIENT +CLOSE +COALESCE +CODE +COERCIBILITY +COLLATE +COLLATION +COLUMNS +COLUMN +COLUMN_NAME +COLUMN_FORMAT +COMMENT +COMMITTED +COMMIT +COMPACT +COMPLETION +COMPRESSED +COMPRESSION +CONCURRENT +CONDITION +CONFIG +CONNECTION +CONNECTION_ID +CONSISTENT +CONSTRAINT +CONSTRAINT_CATALOG +CONSTRAINT_NAME +CONSTRAINT_SCHEMA +CONTAINS +CONTEXT +CONTINUE +CONTRIBUTORS +CONVERT +COUNT +CPU +CREATE +CROSS +CUBE +CURDATE +CURRENT +CURRENT_DATE +CURRENT_TIME +CURRENT_TIMESTAMP +CURRENT_USER +CURSOR +CURSOR_NAME +CURTIME +DATABASE +DATABASES +DATAFILE +DATA +DATANODE +DATASOURCE +DATETIME +DATE_ADD +DATE_ADD_INTERVAL +DATE_SUB +DATE_SUB_INTERVAL +DATE +DAYOFMONTH +DAY_HOUR +DAY_MICROSECOND +DAY_MINUTE +DAY_SECOND +DAY +DB_TYPE +DEALLOCATE +DEC +DECIMAL_NUM +DECIMAL +DECLARE +DEFAULT +DEFAULT_AUTH +DEFINER +DELAYED +DELAY_KEY_WRITE +DELETE +DESC +DESCRIBE +DES_KEY_FILE +DETAIL +DETERMINISTIC +DIAGNOSTICS +DIRECTORY +DISABLE +DISCARD +DISK +DISTINCT +DISTINCTROW +DIV +DOUBLE +DO +DROP +DUAL +DUMPFILE +DUPLICATE +DYNAMIC +EACH +ELSE +ELSEIF +ENABLE +ENCLOSED +END +ENDS +END_OF_INPUT +ENGINES +ENGINE +ENUM +ERROR +ERRORS +ESCAPED +ESCAPE +EVENTS +EVENT +EVERY +EXCHANGE +EXECUTE +EXISTS +EXIT +EXPANSION +EXPIRE +EXPLAIN +EXPORT +EXTENDED +EXTENT_SIZE +EXTRACT +FALSE +FAST +FAULTS +FETCH +FIELDS +FILE +FILE_BLOCK_SIZE +FILTER +FIRST +FIXED +FLOAT4 +FLOAT8 +FLOAT +FLUSH +FOLLOWS +FOR +FORCE +FOREIGN +FORMAT +FOUND +FOUND_ROWS +FRAC_SECOND +FROM +FULL +FULLTEXT +FUNCTION +GET +GENERAL +GENERATED +GROUP_REPLICATION +GEOMETRYCOLLECTION +GEOMETRY +GET_FORMAT +GLOBAL +GRANT +GRANTS +GROUP +GROUP_CONCAT +HANDLER +HASH +HAVING +HEARTBEAT +HELP +HIGH_PRIORITY +HOST +HOSTS +HOUR_MICROSECOND +HOUR_MINUTE +HOUR_SECOND +HOUR +ID +IDENTIFIED +IF +IFNULL +IGNORE +IGNORE_SERVER_IDS +IMPORT +INDEXES +INDEX +INFILE +INITIAL_SIZE +INNER +INNODB +INOUT +INSENSITIVE +INSERT +INSERT_METHOD +INSTALL +INTEGER +INTERVAL +INTO +INT +INVOKER +IN +IO_AFTER_GTIDS +IO_BEFORE_GTIDS +IO_THREAD +IO +IPC +IS +ISOLATION +ISSUER +ITERATE +JOIN +JSON +KEYS +KEY_BLOCK_SIZE +KEY +KILL +LANGUAGE +LAST +LAST_INSERT_ID +LEADING +LEAVES +LEAVE +LEFT +LESS +LEVEL +LIKE +LIMIT +LINEAR +LINES +LINESTRING +LIST +LOAD +LOCALTIME +LOCALTIMESTAMP +LOCAL +LOCATOR +LOCKS +LOCK +LOGFILE +LOGS +LONGBLOB +LONGTEXT +LONG_NUM +LONG +LOOP +LOW_PRIORITY +MASTER_AUTO_POSITION +MASTER_BIND +MASTER_CONNECT_RETRY +MASTER_DELAY +MASTER_HOST +MASTER_LOG_FILE +MASTER_LOG_POS +MASTER_PASSWORD +MASTER_PORT +MASTER_RETRY_COUNT +MASTER_SERVER_ID +MASTER_SSL_CAPATH +MASTER_SSL_CA +MASTER_SSL_CERT +MASTER_SSL_CIPHER +MASTER_SSL_CRL +MASTER_SSL_CRLPATH +MASTER_SSL_KEY +MASTER_SSL +MASTER_SSL_VERIFY_SERVER_CERT +MASTER +MASTER_USER +MASTER_HEARTBEAT_PERIOD +MATCH +MAX_CONNECTIONS_PER_HOUR +MAX_QUERIES_PER_HOUR +MAX_ROWS +MAX_SIZE +MAX_STATEMENT_TIME +MAX +MAX_UPDATES_PER_HOUR +MAX_USER_CONNECTIONS +MAXVALUE +MEDIUMBLOB +MEDIUMINT +MEDIUMTEXT +MEDIUM +MEMORY +MERGE +MESSAGE_TEXT +MICROSECOND +MID +MIDDLEINT +MIGRATE +MINUTE_MICROSECOND +MINUTE_SECOND +MINUTE +MIN_ROWS +MIN +MODE +MODIFIES +MODIFY +MOD +MONTH +MULTILINESTRING +MULTIPOINT +MULTIPOLYGON +MUTEX +MYCAT +MYSQL_ERRNO +NAMES +NAME +NATIONAL +NATURAL +NCHAR_STRING +NCHAR +NDB +NDBCLUSTER +NEG +NEVER +NEW +NEXT +NODEGROUP +NONE +NONBLOCKING +NOT +NOW +NO +NO_WAIT +NO_WRITE_TO_BINLOG +NULL +NULLIF +NUMBER +NUMERIC +NVARCHAR +OFF +OFFLINE +OFFSET +OLD_PASSWORD +ON +ONE_SHOT +ONE +ONLINE +ONLY +OPEN +OPTIMIZE +OPTIMIZER_COSTS +OPTIONS +OPTION +OPTIONALLY +ORDER +OR +OUTER +OUTFILE +OUT +OWNER +PACK_KEYS +PAGE +PARSER +PARTIAL +PARTITIONING +PARTITIONS +PARTITION +PASSWORD +PHASE +PLUGINS +PLUGIN_DIR +PLUGIN +POINT +POLYGON +PORT +POSITION +PRECEDES +PRECISION +PREPARE +PRESERVE +PREV +PRIMARY +PRIVILEGES +PROCEDURE +PROCESS +PROCESSOR +PROCESSLIST +PROFILE +PROFILES +PROXY +PURGE +QUARTER +QUERY +QUICK +RANGE +READS +READ_ONLY +READ +READ_WRITE +REAL +REBUILD +RECOVER +REDOFILE +REDO_BUFFER_SIZE +REDUNDANT +REFERENCES +REGEXP +RELAY +RELAYLOG +RELAY_LOG_FILE +RELAY_LOG_POS +RELAY_THREAD +RELEASE +RELOAD +REMOVE +RENAME +REORGANIZE +REPAIR +REPEATABLE +REPEAT +REPL +REPL_NAME +REPL_METABEAN_INDEX +REPLACE +REPLICA +REPLICATION +REPLICATE_DO_DB +REPLICATE_IGNORE_DB +REPLICATE_DO_TABLE +REPLICATE_IGNORE_TABLE +REPLICATE_WILD_DO_TABLE +REPLICATE_WILD_IGNORE_TABLE +REPLICATE_REWRITE_DB +REQUIRE +RESET +RESIGNAL +RESTORE +RESTRICT +RESUME +RETURNED_SQLSTATE +RETURNS +RETURN +REVERSE +REVOKE +RIGHT +RLIKE +ROLLBACK +ROLLUP +ROUTE +ROUTER +ROUTINE +ROWS +ROW_COUNT +ROW_FORMAT +ROW +RTREE +SAVEPOINT +SCHEDULE +SCHEMA +SCHEMA_NAME +SCHEMAS +SECOND_MICROSECOND +SECOND +SECURITY +SELECT +SENSITIVE +SEPARATOR +SERIALIZABLE +SERIAL +SESSION +SERVER +SERVER_OPTIONS +SESSION_USER +SET +SET_VAR +SHARE +SHOW +SHUTDOWN +SIGNAL +SIGNED +SIMPLE +SLAVE +SLOW +SMALLINT +SNAPSHOT +SOME +SOCKET +SONAME +SOUNDS +SOURCE +SPATIAL +SPECIFIC +SQLEXCEPTION +SQLSTATE +SQLWARNING +SQL_AFTER_GTIDS +SQL_AFTER_MTS_GAPS +SQL_BEFORE_GTIDS +SQL_BIG_RESULT +SQL_BUFFER_RESULT +SQL_CACHE +SQL_CALC_FOUND_ROWS +SQL_NO_CACHE +SQL_SMALL_RESULT +SQL +SQL_THREAD +SSL +STACKED +STARTING +STARTS +START +STARTUP +STATS_AUTO_RECALC +STATS_PERSISTENT +STATS_SAMPLE_PAGES +STATUS +STDDEV_SAMP +STDDEV +STDDEV_POP +STD +STOP +STORAGE +STORED +STRAIGHT_JOIN +STRING +SUBCLASS_ORIGIN +SUBDATE +SUBJECT +SUBPARTITIONS +SUBPARTITION +SUBSTR +SUBSTRING +SUM +SUPER +SUSPEND +SWAPS +SWITCH +SWITCHES +SYSDATE +SYSTEM_USER +TABLES +TABLESPACE +TABLE_REF_PRIORITY +TABLE +TABLE_CHECKSUM +TABLE_NAME +TEMPORARY +TEMPTABLE +TERMINATED +TEXT +THAN +THEN +THREADPOOL +TIMESTAMP +TIMESTAMP_ADD +TIMESTAMP_DIFF +TIME +TINYBLOB +TINYINT +TINYTEXT +TO +TRAILING +TRANSACTION +TRIGGERS +TRIGGER +TRIM +TRUE +TRUNCATE +TYPES +TYPE +UDF_RETURNS +UNCOMMITTED +UNDEFINED +UNDOFILE +UNDO_BUFFER_SIZE +UNDO +UNICODE +UNINSTALL +UNION +UNIQUE +UNKNOWN +UNLOCK +UNSIGNED +UNTIL +UPDATE +UPGRADE +USAGE +USER_RESOURCES +USER +USE_FRM +USE +USING +UTC_DATE +UTC_TIMESTAMP +UTC_TIME +VALIDATION +VALUES +VALUE +VARBINARY +VARCHAR +VARCHARACTER +VARIABLES +VARIANCE +VARYING +VAR_POP +VAR_SAMP +VERSION +VIEW +VIRTUAL +WAIT +WARNINGS +WEEK +WEIGHT_STRING +WHEN +WHERE +WHILE +WITH +WITH_CUBE +WITH_ROLLUP +WITHOUT +WORK +WRAPPER +WRITE +X509 +XA +XID +XML +XOR +YEAR_MONTH +YEAR +ZEROFILL +INT1 +INT2 +INT3 +INT4 +INT8 +SQL_TSI_FRAC_SECOND +SQL_TSI_SECOND +SQL_TSI_MINUTE +SQL_TSI_HOUR +SQL_TSI_DAY +SQL_TSI_WEEK +SQL_TSI_MONTH +SQL_TSI_QUARTER +SQL_TSI_YEAR diff --git a/source/src/main/java/io/mycat/mycat2/MySQLCommand.java b/source/src/main/java/io/mycat/mycat2/MySQLCommand.java index 1afeb8b..252833e 100644 --- a/source/src/main/java/io/mycat/mycat2/MySQLCommand.java +++ b/source/src/main/java/io/mycat/mycat2/MySQLCommand.java @@ -52,19 +52,4 @@ public interface MySQLCommand { * @return 是否完成了应答 */ public boolean procssSQL(MycatSession session) throws IOException; - - /** - * 向客户端响应 错误信息 - * @param session - * @throws IOException - */ - public default void sendErrorMsg(MycatSession session,int errno,String errMsg) throws IOException{ - ErrorPacket errPkg = new ErrorPacket(); - errPkg.packetId = (byte) (session.proxyBuffer.getByte(session.curMSQLPackgInf.startPos - + ParseUtil.mysql_packetHeader_length) + 1); - errPkg.errno = errno; - errPkg.message = errMsg; - session.proxyBuffer.reset(); - session.responseOKOrError(errPkg); - } } diff --git a/source/src/main/java/io/mycat/mycat2/MySQLSession.java b/source/src/main/java/io/mycat/mycat2/MySQLSession.java index 0faf392..cb28d00 100644 --- a/source/src/main/java/io/mycat/mycat2/MySQLSession.java +++ b/source/src/main/java/io/mycat/mycat2/MySQLSession.java @@ -10,6 +10,7 @@ import io.mycat.mycat2.cmds.pkgread.PkgProcess; import io.mycat.mycat2.console.SessionKeyEnum; import io.mycat.proxy.BufferPool; +import io.mycat.util.ErrorCode; /** * 后端MySQL连接 @@ -64,9 +65,6 @@ public void unbindMycatSession() { @Override public void close(boolean normal, String hint) { super.close(normal, hint); - if(this.mycatSession!=null){ - this.mycatSession.unbindBeckend(this); - } } public String getDatabase() { return database; diff --git a/source/src/main/java/io/mycat/mycat2/MycatConfig.java b/source/src/main/java/io/mycat/mycat2/MycatConfig.java index 82d2ea0..2211af3 100644 --- a/source/src/main/java/io/mycat/mycat2/MycatConfig.java +++ b/source/src/main/java/io/mycat/mycat2/MycatConfig.java @@ -15,6 +15,7 @@ public class MycatConfig { // 当前节点所用的配置文件的版本 private Map configVersionMap = new HashMap<>(); private Map configMap = new HashMap<>(); + private Map configUpdateTimeMap = new HashMap<>(); /** * 系统中所有MySQLRepBean的Map @@ -72,6 +73,7 @@ public T getConfig(ConfigEnum configEnum) { public void putConfig(ConfigEnum configEnum, Configurable config, int version) { configMap.put(configEnum, config); configVersionMap.put(configEnum, version); + configUpdateTimeMap.put(configEnum, System.currentTimeMillis()); } public Map getConfigVersionMap() { @@ -80,6 +82,7 @@ public Map getConfigVersionMap() { public void setConfigVersion(ConfigEnum configEnum, int version) { configVersionMap.put(configEnum, version); + configUpdateTimeMap.put(configEnum, System.currentTimeMillis()); } public int getConfigVersion(ConfigEnum configEnum) { @@ -87,6 +90,10 @@ public int getConfigVersion(ConfigEnum configEnum) { return oldVersion == null ? GlobalBean.INIT_VERSION : oldVersion; } + public long getConfigUpdateTime(ConfigEnum configEnum) { + return configUpdateTimeMap.get(configEnum); + } + public Map getMysqlRepMap() { return mysqlRepMap; } diff --git a/source/src/main/java/io/mycat/mycat2/MycatSession.java b/source/src/main/java/io/mycat/mycat2/MycatSession.java index 8f0ba29..d14294c 100644 --- a/source/src/main/java/io/mycat/mycat2/MycatSession.java +++ b/source/src/main/java/io/mycat/mycat2/MycatSession.java @@ -31,6 +31,7 @@ import io.mycat.proxy.MycatReactorThread; import io.mycat.proxy.ProxyRuntime; import io.mycat.util.ErrorCode; +import io.mycat.util.ParseUtil; import io.mycat.util.RandomUtil; /** @@ -164,6 +165,50 @@ public int getBackendConCounts(MySQLMetaBean metaBean) { .filter(f->f.getMySQLMetaBean().equals(metaBean)) .count(); } + + /** + * 关闭后端连接,同时向前端返回错误信息 + * @param mysqlsession + * @param normal + * @param hint + */ + public void closeBackendAndResponseError(MySQLSession mysqlsession,boolean normal, ErrorPacket error)throws IOException{ + unbindBeckend(mysqlsession); + mysqlsession.close(normal, error.message); + takeBufferOwnerOnly(); + responseOKOrError(error); + } + + /** + * 关闭后端连接,同时向前端返回错误信息 + * @param session + * @param mysqlsession + * @param normal + * @param errno + * @param error + * @throws IOException + */ + public void closeBackendAndResponseError(MySQLSession mysqlsession,boolean normal,int errno, String error)throws IOException{ + unbindBeckend(mysqlsession); + mysqlsession.close(normal, error); + takeBufferOwnerOnly(); + sendErrorMsg(errno,error); + } + + /** + * 向客户端响应 错误信息 + * @param session + * @throws IOException + */ + public void sendErrorMsg(int errno,String errMsg) throws IOException{ + ErrorPacket errPkg = new ErrorPacket(); + errPkg.packetId = (byte) (proxyBuffer.getByte(curMSQLPackgInf.startPos + + ParseUtil.mysql_packetHeader_length) + 1); + errPkg.errno = errno; + errPkg.message = errMsg; + proxyBuffer.reset(); + responseOKOrError(errPkg); + } /** * 绑定后端MySQL会话 @@ -239,6 +284,13 @@ public void unbindBackend(MySQLMetaBean mySQLMetaBean,String reason){ curBackend = null; } } + + public void takeBufferOwnerOnly(){ + this.curBufOwner = true; + if (this.curBackend != null) { + curBackend.setCurBufOwner(false); + } + } /** * 获取ProxyBuffer控制权,同时设置感兴趣的事件,如SocketRead,Write,只能其一 diff --git a/source/src/main/java/io/mycat/mycat2/advice/Invocation.java b/source/src/main/java/io/mycat/mycat2/advice/Invocation.java deleted file mode 100644 index d7bf936..0000000 --- a/source/src/main/java/io/mycat/mycat2/advice/Invocation.java +++ /dev/null @@ -1,8 +0,0 @@ -package io.mycat.mycat2.advice; - -import io.mycat.mycat2.MySQLCommand; - -public interface Invocation extends MySQLCommand { - - public void setCommand(MySQLCommand command); -} diff --git a/source/src/main/java/io/mycat/mycat2/beans/conf/ReplicaBean.java b/source/src/main/java/io/mycat/mycat2/beans/conf/ReplicaBean.java index 3e1c3f5..0353706 100644 --- a/source/src/main/java/io/mycat/mycat2/beans/conf/ReplicaBean.java +++ b/source/src/main/java/io/mycat/mycat2/beans/conf/ReplicaBean.java @@ -16,7 +16,7 @@ public enum BalanceTypeEnum { } public enum RepSwitchTypeEnum { - NOT_SWITCH, DEFAULT_SWITCH, SYN_STATUS_SWITCH, CLUSTER_STATUS_SWITCH; + NOT_SWITCH, SWITCH; } public enum RepTypeEnum { @@ -25,9 +25,7 @@ public enum RepTypeEnum { // 普通主从 MASTER_SLAVE(GlobalBean.MASTER_SLAVE_HEARTBEAT_SQL, GlobalBean.MYSQL_SLAVE_STAUTS_COLMS), // 普通基于garela cluster集群 - GARELA_CLUSTER(GlobalBean.GARELA_CLUSTER_HEARTBEAT_SQL, GlobalBean.MYSQL_CLUSTER_STAUTS_COLMS), - // 基于MGR集群 - GROUP_REPLICATION(GlobalBean.GROUP_REPLICATION_HEARTBEAT_SQL, GlobalBean.MYSQL_SLAVE_STAUTS_COLMS); + GARELA_CLUSTER(GlobalBean.GARELA_CLUSTER_HEARTBEAT_SQL, GlobalBean.MYSQL_CLUSTER_STAUTS_COLMS); private String hearbeatSQL; private String[] fetchColms; diff --git a/source/src/main/java/io/mycat/mycat2/beans/heartbeat/MySQLDetector.java b/source/src/main/java/io/mycat/mycat2/beans/heartbeat/MySQLDetector.java index 005001a..9df077e 100644 --- a/source/src/main/java/io/mycat/mycat2/beans/heartbeat/MySQLDetector.java +++ b/source/src/main/java/io/mycat/mycat2/beans/heartbeat/MySQLDetector.java @@ -28,6 +28,7 @@ import io.mycat.mycat2.net.DefaultMycatSessionHandler; import io.mycat.mycat2.tasks.BackendHeartbeatTask; +import io.mycat.mysql.packet.ErrorPacket; import io.mycat.proxy.MycatReactorThread; import io.mycat.util.TimeUtil; @@ -89,6 +90,7 @@ public void heartbeat() throws IOException { optSession.setCurNIOHandler(heartbeatTask); heartbeatTask.doHeartbeat(); }else{ + optSession.close(false, ((ErrorPacket)rv).message); //连接创建 失败. 如果是主节点,需要重试.并在达到重试次数后,通知集群 if(heartbeat.incrErrorCount() < heartbeat.getSource().getDsMetaBean().getMaxRetryCount()){ heartbeat(); diff --git a/source/src/main/java/io/mycat/mycat2/cmds/ComFieldListCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/ComFieldListCmd.java index 225298b..5d91e56 100644 --- a/source/src/main/java/io/mycat/mycat2/cmds/ComFieldListCmd.java +++ b/source/src/main/java/io/mycat/mycat2/cmds/ComFieldListCmd.java @@ -48,21 +48,25 @@ public boolean procssSQL(MycatSession session) throws IOException { session.clearReadWriteOpts(); session.getBackend((mysqlsession, sender, success,result)->{ + ProxyBuffer curBuffer = session.proxyBuffer; + // 切换 buffer 读写状态 + curBuffer.flip(); + if(success){ mysqlsession.currPkgProc = PkgResultSetReader.INSTANCE; mysqlsession.getSessionAttrMap().put(SessionKeyEnum.SESSION_KEY_COLUMN_OVER.getKey(), true); - - ProxyBuffer curBuffer = session.proxyBuffer; - // 切换 buffer 读写状态 - curBuffer.flip(); // 没有读取,直接透传时,需要指定 透传的数据 截止位置 curBuffer.readIndex = curBuffer.writeIndex; // 改变 owner,对端Session获取,并且感兴趣写事件 session.giveupOwner(SelectionKey.OP_WRITE); - mysqlsession.writeToChannel(); + try { + mysqlsession.writeToChannel(); + } catch (IOException e) { + session.closeBackendAndResponseError(mysqlsession,success,((ErrorPacket) result)); + } }else{ - session.responseOKOrError((ErrorPacket)result); + session.closeBackendAndResponseError(mysqlsession,success,((ErrorPacket) result)); } }); return false; diff --git a/source/src/main/java/io/mycat/mycat2/cmds/DefaultInvocation.java b/source/src/main/java/io/mycat/mycat2/cmds/ComPingCmd.java similarity index 51% rename from source/src/main/java/io/mycat/mycat2/cmds/DefaultInvocation.java rename to source/src/main/java/io/mycat/mycat2/cmds/ComPingCmd.java index 0c158dc..be3cd86 100644 --- a/source/src/main/java/io/mycat/mycat2/cmds/DefaultInvocation.java +++ b/source/src/main/java/io/mycat/mycat2/cmds/ComPingCmd.java @@ -2,59 +2,71 @@ import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.mycat.mycat2.MySQLCommand; import io.mycat.mycat2.MySQLSession; import io.mycat.mycat2.MycatSession; -import io.mycat.mycat2.advice.Invocation; - +import io.mycat.mysql.packet.OKPacket; /** - * 默认的mysql命令实现 + * COM_PING: + check if the server is alive + Returns + OK_Packet + Payload + 1 [0e] COM_PING * @author yanjunli * */ -public class DefaultInvocation implements Invocation{ +public class ComPingCmd implements MySQLCommand{ - protected MySQLCommand command; - - public DefaultInvocation(){} + private static final Logger logger = LoggerFactory.getLogger(ComPingCmd.class); + + public static final ComPingCmd INSTANCE = new ComPingCmd(); + private ComPingCmd(){} + @Override - public void setCommand(MySQLCommand command) { - this.command = command; + public boolean procssSQL(MycatSession session) throws IOException { + session.responseOKOrError(OKPacket.OK); + return false; } - + @Override public boolean onBackendResponse(MySQLSession session) throws IOException { - return command.onBackendResponse(session); + // TODO Auto-generated method stub + return false; } @Override public boolean onBackendClosed(MySQLSession session, boolean normal) throws IOException { - return command.onBackendClosed(session, normal); + // TODO Auto-generated method stub + return false; } @Override public boolean onFrontWriteFinished(MycatSession session) throws IOException { - return command.onFrontWriteFinished(session); + // TODO Auto-generated method stub + return false; } @Override public boolean onBackendWriteFinished(MySQLSession session) throws IOException { - return command.onBackendWriteFinished(session); + // TODO Auto-generated method stub + return false; } @Override public void clearFrontResouces(MycatSession session, boolean sessionCLosed) { - command.clearFrontResouces(session, sessionCLosed); + // TODO Auto-generated method stub + } @Override public void clearBackendResouces(MySQLSession session, boolean sessionCLosed) { - command.clearBackendResouces(session, sessionCLosed); + // TODO Auto-generated method stub + } - @Override - public boolean procssSQL(MycatSession session) throws IOException { - return command.procssSQL(session); - } } diff --git a/source/src/main/java/io/mycat/mycat2/cmds/ComStatisticsCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/ComStatisticsCmd.java new file mode 100644 index 0000000..d4906c1 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/ComStatisticsCmd.java @@ -0,0 +1,62 @@ +package io.mycat.mycat2.cmds; + +import java.io.IOException; +import java.nio.channels.SelectionKey; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MycatSession; +import io.mycat.mycat2.cmds.pkgread.PkgResultSetReader; +import io.mycat.mycat2.console.SessionKeyEnum; +import io.mycat.mysql.packet.ErrorPacket; +import io.mycat.proxy.ProxyBuffer; + +/** + * Get a human readable string of internal statistics. + COM_STATISTICS: + get a list of active threads + Returns + string.EOF + * @author yanjunli + * + */ +public class ComStatisticsCmd extends DirectPassthrouhCmd{ + + private static final Logger logger = LoggerFactory.getLogger(ComStatisticsCmd.class); + + public static final ComStatisticsCmd INSTANCE = new ComStatisticsCmd(); + + private ComStatisticsCmd(){} + + @Override + public boolean procssSQL(MycatSession session) throws IOException { + + logger.error("com_statistics command is not implement!!!! please fix it"); + /* + * 获取后端连接可能涉及到异步处理,这里需要先取消前端读写事件 + */ + session.clearReadWriteOpts(); + + session.getBackend((mysqlsession, sender, success,result)->{ + ProxyBuffer curBuffer = session.proxyBuffer; + // 切换 buffer 读写状态 + curBuffer.flip(); + + if(success){ + // 没有读取,直接透传时,需要指定 透传的数据 截止位置 + curBuffer.readIndex = curBuffer.writeIndex; + // 改变 owner,对端Session获取,并且感兴趣写事件 + session.giveupOwner(SelectionKey.OP_WRITE); + try { + mysqlsession.writeToChannel(); + } catch (IOException e) { + session.closeBackendAndResponseError(mysqlsession,success,((ErrorPacket) result)); + } + }else{ + session.closeBackendAndResponseError(mysqlsession,success,((ErrorPacket) result)); + } + }); + return false; + } +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java index 3aa5071..c3df332 100644 --- a/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java +++ b/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java @@ -18,6 +18,7 @@ import io.mycat.mysql.packet.ErrorPacket; import io.mycat.mysql.packet.MySQLPacket; import io.mycat.proxy.ProxyBuffer; +import io.mycat.util.ErrorCode; /** * 直接透传命令报文 @@ -51,17 +52,21 @@ public boolean procssSQL(MycatSession session) throws IOException { session.clearReadWriteOpts(); session.getBackend((mysqlsession, sender, success, result) -> { + ProxyBuffer curBuffer = session.proxyBuffer; + // 切换 buffer 读写状态 + curBuffer.flip(); if (success) { - ProxyBuffer curBuffer = session.proxyBuffer; - // 切换 buffer 读写状态 - curBuffer.flip(); // 没有读取,直接透传时,需要指定 透传的数据 截止位置 curBuffer.readIndex = curBuffer.writeIndex; // 改变 owner,对端Session获取,并且感兴趣写事件 session.giveupOwner(SelectionKey.OP_WRITE); - mysqlsession.writeToChannel(); + try { + mysqlsession.writeToChannel(); + } catch (IOException e) { + session.closeBackendAndResponseError(mysqlsession,success,((ErrorPacket) result)); + } } else { - session.responseOKOrError((ErrorPacket) result); + session.closeBackendAndResponseError(mysqlsession,success,((ErrorPacket) result)); } }); return false; diff --git a/source/src/main/java/io/mycat/mycat2/cmds/NotSupportCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/NotSupportCmd.java index 05e5827..44e8c5a 100644 --- a/source/src/main/java/io/mycat/mycat2/cmds/NotSupportCmd.java +++ b/source/src/main/java/io/mycat/mycat2/cmds/NotSupportCmd.java @@ -8,6 +8,9 @@ import io.mycat.mycat2.MySQLCommand; import io.mycat.mycat2.MySQLSession; import io.mycat.mycat2.MycatSession; +import io.mycat.mysql.packet.ErrorPacket; +import io.mycat.util.ErrorCode; +import io.mycat.util.ParseUtil; public class NotSupportCmd implements MySQLCommand{ @@ -17,8 +20,13 @@ public class NotSupportCmd implements MySQLCommand{ @Override public boolean procssSQL(MycatSession session) throws IOException { - // TODO Auto-generated method stub - return false; + ErrorPacket error = new ErrorPacket(); + error.errno = ErrorCode.ER_BAD_DB_ERROR; + error.packetId = session.proxyBuffer.getByte(session.curMSQLPackgInf.startPos + + ParseUtil.mysql_packetHeader_length); + error.message = " command is not supported"; + session.responseOKOrError(error); + return false; } @Override diff --git a/source/src/main/java/io/mycat/mycat2/cmds/interceptor/SQLCachCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/interceptor/SQLCachCmd.java index 9d0a4b7..d04d21a 100644 --- a/source/src/main/java/io/mycat/mycat2/cmds/interceptor/SQLCachCmd.java +++ b/source/src/main/java/io/mycat/mycat2/cmds/interceptor/SQLCachCmd.java @@ -5,7 +5,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.mycat.mycat2.MySQLSession; import io.mycat.mycat2.MycatSession; import io.mycat.mycat2.cmds.DefaultMySQLCommand; import io.mycat.mycat2.cmds.cache.directfrontchain.resulttomap.front.CacheExistsCheck; @@ -32,7 +31,7 @@ public boolean procssSQL(MycatSession session) throws IOException { if(BufferSQLContext.SELECT_SQL != context.getSQLType()){ String errmsg = " sqlType is invalid . sqlcache type must be select !"; - sendErrorMsg(session,ErrorCode.ER_INVALID_DEFAULT,errmsg); + session.sendErrorMsg(ErrorCode.ER_INVALID_DEFAULT,errmsg); logger.error(errmsg); return true; } @@ -40,7 +39,7 @@ public boolean procssSQL(MycatSession session) throws IOException { if(BufferSQLContext.ANNOTATION_SQL_CACHE != context.getAnnotationType()){ String errmsg = " annotationType is invalid . annotationType must be ANNOTATION_SQL_CACHE !"; - sendErrorMsg(session,ErrorCode.ER_INVALID_DEFAULT,errmsg); + session.sendErrorMsg(ErrorCode.ER_INVALID_DEFAULT,errmsg); logger.error(errmsg); return true; } diff --git a/source/src/main/java/io/mycat/mycat2/cmds/manager/MycatShowConfigsCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/manager/MycatShowConfigsCmd.java new file mode 100644 index 0000000..7129885 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/manager/MycatShowConfigsCmd.java @@ -0,0 +1,145 @@ +package io.mycat.mycat2.cmds.manager; + +import io.mycat.mycat2.MySQLCommand; +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.MycatConfig; +import io.mycat.mycat2.MycatSession; +import io.mycat.mysql.Fields; +import io.mycat.mysql.packet.EOFPacket; +import io.mycat.mysql.packet.FieldPacket; +import io.mycat.mysql.packet.ResultSetHeaderPacket; +import io.mycat.mysql.packet.RowDataPacket; +import io.mycat.proxy.ConfigEnum; +import io.mycat.proxy.Configurable; +import io.mycat.proxy.ProxyBuffer; +import io.mycat.proxy.ProxyRuntime; +import io.mycat.util.PacketUtil; +import io.mycat.util.YamlUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Objects; + +/** + * mycat管理命令处理类,用于查询当前配置信息 + * + * @date: 12/10/2017 + * @author: gaozhiwen + */ +public class MycatShowConfigsCmd implements MySQLCommand { + private static final Logger LOGGER = LoggerFactory.getLogger(MycatShowConfigsCmd.class); + public static final MycatShowConfigsCmd INSTANCE = new MycatShowConfigsCmd(); + + private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; + private static final int FIELD_COUNT = 4; + private static final ResultSetHeaderPacket header = PacketUtil.getHeader(FIELD_COUNT); + private static final FieldPacket[] fields = new FieldPacket[FIELD_COUNT]; + private static final EOFPacket eof = new EOFPacket(); + + static { + int i = 0; + byte packetId = 0; + header.packetId = ++packetId; + + fields[i] = PacketUtil.getField("name", Fields.FIELD_TYPE_VAR_STRING); + fields[i++].packetId = ++packetId; + + fields[i] = PacketUtil.getField("version", Fields.FIELD_TYPE_INT24); + fields[i++].packetId = ++packetId; + + fields[i] = PacketUtil.getField("lastUpdateTime", Fields.FIELD_TYPE_LONG); + fields[i++].packetId = ++packetId; + + fields[i] = PacketUtil.getField("content", Fields.FIELD_TYPE_VAR_STRING); + fields[i++].packetId = ++packetId; + + eof.packetId = ++packetId; + } + + @Override + public boolean procssSQL(MycatSession session) throws IOException { + ProxyBuffer buffer = session.proxyBuffer; + buffer.reset(); + // write header + header.write(buffer); + + // write fields + for (FieldPacket field : fields) { + field.write(buffer); + } + + // write eof + eof.write(buffer); + + // write rows + byte packetId = eof.packetId; + MycatConfig conf = ProxyRuntime.INSTANCE.getConfig(); + + DateFormat format = new SimpleDateFormat(DATE_FORMAT); + for (ConfigEnum configEnum : ConfigEnum.values()) { + Configurable confValue = conf.getConfig(configEnum); + if (confValue == null) { + continue; + } + RowDataPacket row = new RowDataPacket(FIELD_COUNT); + row.add(configEnum.name().getBytes()); + row.add(Integer.toString(conf.getConfigVersion(configEnum)).getBytes()); + row.add(format.format(conf.getConfigUpdateTime(configEnum)).getBytes()); + row.add(YamlUtil.dump(confValue).getBytes()); + row.packetId = ++packetId; + row.write(buffer); + } + + // write last eof + EOFPacket lastEof = new EOFPacket(); + lastEof.packetId = ++packetId; + lastEof.write(buffer); + + buffer.flip(); + buffer.readIndex = buffer.writeIndex; + session.writeToChannel(); + return false; + } + + @Override + public boolean onBackendResponse(MySQLSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onBackendClosed(MySQLSession session, boolean normal) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onFrontWriteFinished(MycatSession session) throws IOException { + session.proxyBuffer.flip(); + // session.chnageBothReadOpts(); + session.takeOwner(SelectionKey.OP_READ); + return false; + } + + @Override + public boolean onBackendWriteFinished(MySQLSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void clearFrontResouces(MycatSession session, boolean sessionCLosed) { + // TODO Auto-generated method stub + + } + + @Override + public void clearBackendResouces(MySQLSession session, boolean sessionCLosed) { + // TODO Auto-generated method stub + + } +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/pkgread/PkgFirstReader.java b/source/src/main/java/io/mycat/mycat2/cmds/pkgread/PkgFirstReader.java index 4398f30..54d93d1 100644 --- a/source/src/main/java/io/mycat/mycat2/cmds/pkgread/PkgFirstReader.java +++ b/source/src/main/java/io/mycat/mycat2/cmds/pkgread/PkgFirstReader.java @@ -2,13 +2,17 @@ import java.io.IOException; import java.nio.channels.SelectionKey; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import io.mycat.mycat2.AbstractMySQLSession.CurrPacketType; +import io.mycat.mycat2.MySQLCommand; import io.mycat.mycat2.MySQLSession; import io.mycat.mycat2.MycatSession; import io.mycat.mycat2.beans.MySQLPackageInf; +import io.mycat.mycat2.cmds.ComStatisticsCmd; import io.mycat.mycat2.cmds.LoadDataCommand; import io.mycat.mycat2.cmds.judge.DirectTransJudge; import io.mycat.mycat2.cmds.judge.ErrorJudge; @@ -41,13 +45,21 @@ public class PkgFirstReader implements PkgProcess { * 指定需要处理的包类型信息 */ private static final Map JUDGEMAP = new HashMap<>(); + + /** + * 特殊命令报文,不需要判断首包,直接返回。 例如 + */ + private static final List extendCmdPkg = new ArrayList<>(); static { // 用来进行ok包的处理理 JUDGEMAP.put((int) MySQLPacket.OK_PACKET, OkJudge.INSTANCE); // 用来进行error包的处理 JUDGEMAP.put((int) MySQLPacket.ERROR_PACKET, ErrorJudge.INSTANCE); + + extendCmdPkg.add(ComStatisticsCmd.INSTANCE); } + @Override public boolean procssPkg(MySQLSession session) throws IOException { @@ -63,6 +75,10 @@ public boolean procssPkg(MySQLSession session) throws IOException { if (null != pkgTypeEnum && CurrPacketType.Full == pkgTypeEnum) { int pkgType = curMSQLPackgInf.pkgType; + + if(extendCmdPkg.contains(session.getMycatSession().getCmdChain().getCurrentSQLCommand())){ + return false; + } // 如果当前为查询包,则切换到查询的逻辑命令处理 if (QUERY_PKG_START <= pkgType) { diff --git a/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComShutdownCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComShutdownCmd.java new file mode 100644 index 0000000..88c94c6 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComShutdownCmd.java @@ -0,0 +1,39 @@ +package io.mycat.mycat2.cmds.sqlCmds; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MycatSession; +import io.mycat.mycat2.cmds.DirectPassthrouhCmd; +import io.mycat.mysql.packet.OKPacket; + +/** + * This statement stops the MySQL server + * @author yanjunli + * + */ +public class SqlComShutdownCmd extends DirectPassthrouhCmd{ + + private static final Logger logger = LoggerFactory.getLogger(SqlComShutdownCmd.class); + + public static final SqlComShutdownCmd INSTANCE = new SqlComShutdownCmd(); + + private SqlComShutdownCmd(){} + + @Override + public boolean procssSQL(MycatSession session) throws IOException { + session.responseOKOrError(OKPacket.OK); + + return false; + } + + @Override + public boolean onFrontWriteFinished(MycatSession session) throws IOException { + logger.warn("mycat exit. bye"); + System.exit(0); + return true; + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/strategy/AbstractCmdStrategy.java b/source/src/main/java/io/mycat/mycat2/cmds/strategy/AbstractCmdStrategy.java index 1bb9ed0..e244d3b 100644 --- a/source/src/main/java/io/mycat/mycat2/cmds/strategy/AbstractCmdStrategy.java +++ b/source/src/main/java/io/mycat/mycat2/cmds/strategy/AbstractCmdStrategy.java @@ -60,7 +60,8 @@ final public void matchMySqlCommand(MycatSession session) { int rowDataIndex = session.curMSQLPackgInf.startPos + MySQLPacket.packetHeaderSize +1 ; int length = session.curMSQLPackgInf.pkgLength - MySQLPacket.packetHeaderSize - 1 ; parser.parse(session.proxyBuffer.getBuffer(), rowDataIndex, length, session.sqlContext); - command = MYSQLCOMMANDMAP.get(session.sqlContext.getSQLType()); + byte sqltype = session.sqlContext.getSQLType()!=0?session.sqlContext.getSQLType():session.sqlContext.getCurSQLType(); + command = MYSQLCOMMANDMAP.get(sqltype); }else{ command = MYCOMMANDMAP.get((byte)session.curMSQLPackgInf.pkgType); } diff --git a/source/src/main/java/io/mycat/mycat2/cmds/strategy/DBInOneServerCmdStrategy.java b/source/src/main/java/io/mycat/mycat2/cmds/strategy/DBInOneServerCmdStrategy.java index 4067ce7..eec75a6 100644 --- a/source/src/main/java/io/mycat/mycat2/cmds/strategy/DBInOneServerCmdStrategy.java +++ b/source/src/main/java/io/mycat/mycat2/cmds/strategy/DBInOneServerCmdStrategy.java @@ -3,13 +3,17 @@ import io.mycat.mycat2.cmds.ComChangeUserCmd; import io.mycat.mycat2.cmds.ComFieldListCmd; import io.mycat.mycat2.cmds.ComInitDB; +import io.mycat.mycat2.cmds.ComPingCmd; import io.mycat.mycat2.cmds.ComQuitCmd; +import io.mycat.mycat2.cmds.ComStatisticsCmd; import io.mycat.mycat2.cmds.DirectPassthrouhCmd; import io.mycat.mycat2.cmds.NotSupportCmd; +import io.mycat.mycat2.cmds.manager.MycatShowConfigsCmd; import io.mycat.mycat2.cmds.manager.MycatSwitchReplCmd; import io.mycat.mycat2.cmds.sqlCmds.SqlComBeginCmd; import io.mycat.mycat2.cmds.sqlCmds.SqlComCommitCmd; import io.mycat.mycat2.cmds.sqlCmds.SqlComRollBackCmd; +import io.mycat.mycat2.cmds.sqlCmds.SqlComShutdownCmd; import io.mycat.mycat2.cmds.sqlCmds.SqlComStartCmd; import io.mycat.mycat2.sqlparser.BufferSQLContext; import io.mycat.mysql.packet.MySQLPacket; @@ -28,12 +32,12 @@ protected void initMyCmdHandler() { MYCOMMANDMAP.put(MySQLPacket.COM_DROP_DB, NotSupportCmd.INSTANCE); MYCOMMANDMAP.put(MySQLPacket.COM_REFRESH, DirectPassthrouhCmd.INSTANCE); MYCOMMANDMAP.put(MySQLPacket.COM_SHUTDOWN, NotSupportCmd.INSTANCE); - MYCOMMANDMAP.put(MySQLPacket.COM_STATISTICS, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_STATISTICS, ComStatisticsCmd.INSTANCE); MYCOMMANDMAP.put(MySQLPacket.COM_PROCESS_INFO, DirectPassthrouhCmd.INSTANCE); MYCOMMANDMAP.put(MySQLPacket.COM_CONNECT, NotSupportCmd.INSTANCE); MYCOMMANDMAP.put(MySQLPacket.COM_PROCESS_KILL, DirectPassthrouhCmd.INSTANCE); MYCOMMANDMAP.put(MySQLPacket.COM_DEBUG, NotSupportCmd.INSTANCE); - MYCOMMANDMAP.put(MySQLPacket.COM_PING, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_PING, ComPingCmd.INSTANCE); MYCOMMANDMAP.put(MySQLPacket.COM_TIME, NotSupportCmd.INSTANCE); MYCOMMANDMAP.put(MySQLPacket.COM_DELAYED_INSERT, NotSupportCmd.INSTANCE); MYCOMMANDMAP.put(MySQLPacket.COM_CHANGE_USER, ComChangeUserCmd.INSTANCE); @@ -63,7 +67,8 @@ protected void initMySqlCmdHandler() { MYSQLCOMMANDMAP.put(BufferSQLContext.BEGIN_SQL, SqlComBeginCmd.INSTANCE); MYSQLCOMMANDMAP.put(BufferSQLContext.START_SQL, SqlComStartCmd.INSTANCE); MYSQLCOMMANDMAP.put(BufferSQLContext.USE_SQL, SqlComStartCmd.INSTANCE); + MYSQLCOMMANDMAP.put(BufferSQLContext.SHUTDOWN_SQL, SqlComShutdownCmd.INSTANCE); MYSQLCOMMANDMAP.put(BufferSQLContext.MYCAT_SWITCH_REPL, MycatSwitchReplCmd.INSTANCE); - + MYSQLCOMMANDMAP.put(BufferSQLContext.MYCAT_SHOW_CONFIGS, MycatShowConfigsCmd.INSTANCE); } } diff --git a/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java b/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java index db5f540..7ae3a6b 100644 --- a/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java +++ b/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java @@ -1,6 +1,7 @@ package io.mycat.mycat2.net; import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import org.slf4j.Logger; @@ -13,6 +14,7 @@ import io.mycat.mycat2.console.SessionKeyEnum; import io.mycat.proxy.NIOHandler; import io.mycat.proxy.ProxyBuffer; +import io.mycat.util.ErrorCode; /** * 负责MycatSession的NIO事件,驱动SQLCommand命令执行,完成SQL的处理过程 @@ -51,15 +53,24 @@ private void onFrontRead(final MycatSession session) throws IOException { // 如果当前包需要处理,则交给对应方法处理,否则直接透传 if(session.getCmdChain().getCurrentSQLCommand().procssSQL(session)){ - session.getCmdChain().getCurrentSQLCommand().clearFrontResouces(session, false); + session.getCmdChain().getCurrentSQLCommand().clearFrontResouces(session, session.isClosed()); } } private void onBackendRead(MySQLSession session) throws IOException { // 交给SQLComand去处理 MySQLCommand curCmd = session.getCmdChain().getCurrentSQLCommand(); - if (curCmd.onBackendResponse(session)) { - curCmd.clearBackendResouces((MySQLSession) session,false); + try{ + if (curCmd.onBackendResponse(session)) { + curCmd.clearBackendResouces(session,session.isClosed()); + } + }catch(ClosedChannelException ex){ + String errmsg = " read backend response error ,backend conn has closed."; + logger.error(errmsg); + session.getMycatSession().closeBackendAndResponseError(session,false,ErrorCode.ERR_CONNECT_SOCKET,errmsg); + } catch (IOException e) { + logger.error(" read backend response error.",e); + session.getMycatSession().closeBackendAndResponseError(session,false,ErrorCode.ERR_CONNECT_SOCKET, e.getMessage()); } } diff --git a/source/src/main/java/io/mycat/mycat2/sqlannotations/SQLAnnotation.java b/source/src/main/java/io/mycat/mycat2/sqlannotations/SQLAnnotation.java index bfaaa52..8dec705 100644 --- a/source/src/main/java/io/mycat/mycat2/sqlannotations/SQLAnnotation.java +++ b/source/src/main/java/io/mycat/mycat2/sqlannotations/SQLAnnotation.java @@ -25,12 +25,12 @@ public void setActionName(String actionName){ abstract public MySQLCommand getMySQLCommand(); /** - * Ĭϵظ, ݸ÷ȥظ - * Ҫжʵ,Էزֵͬ + * 默认的重复检查, 命令链会根据该方法,进行去重复操作。 + * 如果 需要有多个实例,可以返回不同的值。 * @return */ public long currentKey() { - // . ֻһΣﷵֵͬ + // 结果集缓存. 在责任链中 只允许出现一次,这里返回相同的值 return this.getClass().getSimpleName().hashCode(); } diff --git a/source/src/main/java/io/mycat/mycat2/sqlparser/BufferSQLContext.java b/source/src/main/java/io/mycat/mycat2/sqlparser/BufferSQLContext.java index 3c86275..99c56f3 100644 --- a/source/src/main/java/io/mycat/mycat2/sqlparser/BufferSQLContext.java +++ b/source/src/main/java/io/mycat/mycat2/sqlparser/BufferSQLContext.java @@ -62,8 +62,12 @@ public class BufferSQLContext { public static final byte XA_START = 40; public static final byte XA_BEGIN = 41; public static final byte XA_END = 42; - + + //admin command public static final byte MYCAT_SWITCH_REPL = 43; + public static final byte MYCAT_SHOW_CONFIGS = 44; + + public static final byte SHUTDOWN_SQL = 45; //ANNOTATION TYPE public static final byte ANNOTATION_BALANCE = 1; @@ -82,6 +86,8 @@ public class BufferSQLContext { private short[] sqlInfoArray; //用于记录sql索引,用于支持sql批量提交,格式 [{hash array start pos, sql type(15-5 hash array real sql offset, 4-0 sql type), tblResult start pos, tblResult count}] private byte totalTblCount; private int[] annotationCondition; + private int[] selectItemArray; + private int selectItemArrayPos; private int tblResultPos; private byte schemaCount; private int schemaResultPos; @@ -109,6 +115,7 @@ public BufferSQLContext() { annotationValue = new long[16]; annotationCondition=new int[64]; myCmdValue = new HashArray(256); + selectItemArray = new int[128]; } public void setCurBuffer(ByteArrayInterface curBuffer) { @@ -123,6 +130,8 @@ public void setCurBuffer(ByteArrayInterface curBuffer) { sqlType = 0; annotationType = 0; Arrays.fill(annotationValue, 0); + Arrays.fill(selectItemArray, 0); + selectItemArrayPos = 0; hasLimit = false; totalSQLCount = 0; limitStart = 0; @@ -385,4 +394,15 @@ public int[] getAnnotationCondition() { public byte getSchemaCount() { return schemaCount; } + + public void setSelectItem(int functionHash) { + selectItemArray[selectItemArrayPos++] = functionHash; + } + + public int getSelectItem(int pos) { + if (pos > 128) { + return 0; + } + return selectItemArray[pos]; + } } diff --git a/source/src/main/java/io/mycat/mycat2/sqlparser/BufferSQLParser.java b/source/src/main/java/io/mycat/mycat2/sqlparser/BufferSQLParser.java index c298cac..d97f3af 100644 --- a/source/src/main/java/io/mycat/mycat2/sqlparser/BufferSQLParser.java +++ b/source/src/main/java/io/mycat/mycat2/sqlparser/BufferSQLParser.java @@ -4,7 +4,6 @@ import io.mycat.mycat2.sqlparser.SQLParseUtils.Tokenizer; import io.mycat.mycat2.sqlparser.byteArrayInterface.*; import io.mycat.mycat2.sqlparser.byteArrayInterface.dcl.DCLSQLParser; -import io.mycat.mycat2.sqlparser.byteArrayInterface.dynamicAnnotation.DynamicAnnotationManagerImpl; import io.mycat.mycat2.sqlparser.byteArrayInterface.mycat.MYCATSQLParser; import java.nio.ByteBuffer; @@ -14,7 +13,6 @@ import org.slf4j.LoggerFactory; import static io.mycat.mycat2.sqlparser.byteArrayInterface.TokenizerUtil.debug; -import static io.mycat.mycat2.sqlparser.byteArrayInterface.TokenizerUtil.debugError; /** * Created by Kaiz on 2017/2/6. @@ -66,17 +64,17 @@ int pickTableNames(int pos, final int arrayCount, BufferSQLContext context) { pos++; while (pos < arrayCount) { type = hashArray.getType(pos); - if (type == Tokenizer.DOT) { + if (type == Tokenizer2.DOT) { ++pos; context.pushSchemaName(pos); //context.setTblNameStart(hashArray.getPos(pos));// TODO: 2017/3/10 可以优化成一个接口 //context.setTblNameSize(hashArray.getSize(pos)); ++pos; - } else if (type == Tokenizer.SEMICOLON) { + } else if (type == Tokenizer2.SEMICOLON) { return pos; - } else if (type == Tokenizer.RIGHT_PARENTHESES || type == Tokenizer.LEFT_PARENTHESES) { + } else if (type == Tokenizer2.RIGHT_PARENTHESES || type == Tokenizer2.LEFT_PARENTHESES) { return ++pos; - } else if (type == Tokenizer.COMMA) { + } else if (type == Tokenizer2.COMMA) { return pickTableNames(++pos, arrayCount, context); } else if ((type = hashArray.getIntHash(pos)) == IntTokenHash.AS) { pos += 2;// TODO: 2017/3/10 二阶段解析需要别名,需要把别名存储下来 @@ -94,17 +92,17 @@ int pickTableNames(int pos, final int arrayCount, BufferSQLContext context) { int pickLimits(int pos, final int arrayCount, BufferSQLContext context) { int minus = 1; - if (hashArray.getType(pos) == Tokenizer.DIGITS) { + if (hashArray.getType(pos) == Tokenizer2.DIGITS) { context.setLimit(); context.setLimitCount(TokenizerUtil.pickNumber(pos, hashArray, sql)); - if (++pos < arrayCount && hashArray.getType(pos) == Tokenizer.COMMA) { + if (++pos < arrayCount && hashArray.getType(pos) == Tokenizer2.COMMA) { context.pushLimitStart(); if (++pos < arrayCount) { - if (hashArray.getType(pos) == Tokenizer.MINUS) { + if (hashArray.getType(pos) == Tokenizer2.MINUS) { minus = -1; ++pos; } - if (hashArray.getType(pos) == Tokenizer.DIGITS) { + if (hashArray.getType(pos) == Tokenizer2.DIGITS) { //// TODO: 2017/3/11 需要完善处理数字部分逻辑 context.setLimitCount(TokenizerUtil.pickNumber(pos, hashArray, sql) * minus); } @@ -197,52 +195,52 @@ int pickAnnotation(int pos, final int arrayCount, BufferSQLContext context) { return pos; case IntTokenHash.DATANODE: context.setAnnotationType(BufferSQLContext.ANNOTATION_DATANODE); - if (hashArray.getType(++pos) == Tokenizer.EQUAL) { + if (hashArray.getType(++pos) == Tokenizer2.EQUAL) { context.setAnnotationValue(BufferSQLContext.ANNOTATION_DATANODE, hashArray.getHash(++pos)); } break; case IntTokenHash.SCHEMA: context.setAnnotationType(BufferSQLContext.ANNOTATION_SCHEMA); - if (hashArray.getType(++pos) == Tokenizer.EQUAL) { + if (hashArray.getType(++pos) == Tokenizer2.EQUAL) { context.setAnnotationValue(BufferSQLContext.ANNOTATION_SCHEMA, hashArray.getHash(++pos)); } break; case IntTokenHash.SQL: context.setAnnotationType(BufferSQLContext.ANNOTATION_SQL); - if (hashArray.getType(++pos) == Tokenizer.EQUAL) { + if (hashArray.getType(++pos) == Tokenizer2.EQUAL) { } break; case IntTokenHash.CATLET: context.setAnnotationType(BufferSQLContext.ANNOTATION_CATLET); - if (hashArray.getType(++pos) == Tokenizer.EQUAL) { + if (hashArray.getType(++pos) == Tokenizer2.EQUAL) { } break; case IntTokenHash.DB_TYPE: context.setAnnotationType(BufferSQLContext.ANNOTATION_DB_TYPE); - if (hashArray.getType(++pos) == Tokenizer.EQUAL) { + if (hashArray.getType(++pos) == Tokenizer2.EQUAL) { context.setAnnotationValue(BufferSQLContext.ANNOTATION_DB_TYPE, hashArray.getHash(++pos)); ++pos; } break; case IntTokenHash.ACCESS_COUNT: context.setAnnotationType(BufferSQLContext.ANNOTATION_SQL_CACHE); - if (hashArray.getType(++pos) == Tokenizer.EQUAL) { + if (hashArray.getType(++pos) == Tokenizer2.EQUAL) { context.setAnnotationValue(BufferSQLContext.ANNOTATION_ACCESS_COUNT, TokenizerUtil.pickNumber(++pos, hashArray, sql)); ++pos; } break; case IntTokenHash.AUTO_REFRESH: context.setAnnotationType(BufferSQLContext.ANNOTATION_SQL_CACHE); - if (hashArray.getType(++pos) == Tokenizer.EQUAL) { + if (hashArray.getType(++pos) == Tokenizer2.EQUAL) { context.setAnnotationValue(BufferSQLContext.ANNOTATION_AUTO_REFRESH, hashArray.getHash(++pos)); ++pos; } break; case IntTokenHash.CACHE_TIME: context.setAnnotationType(BufferSQLContext.ANNOTATION_SQL_CACHE); - if (hashArray.getType(++pos) == Tokenizer.EQUAL) { + if (hashArray.getType(++pos) == Tokenizer2.EQUAL) { context.setAnnotationValue(BufferSQLContext.ANNOTATION_CACHE_TIME, TokenizerUtil.pickNumber(++pos, hashArray, sql)); ++pos; } @@ -255,7 +253,7 @@ int pickAnnotation(int pos, final int arrayCount, BufferSQLContext context) { if (hashArray.getHash(pos) == TokenHash.BALANCE) { context.setAnnotationType(BufferSQLContext.ANNOTATION_BALANCE); if (hashArray.getHash(++pos)==TokenHash.TYPE) { - if (hashArray.getType(++pos) == Tokenizer.EQUAL) { + if (hashArray.getType(++pos) == Tokenizer2.EQUAL) { context.setAnnotationValue(BufferSQLContext.ANNOTATION_BALANCE, hashArray.getHash(++pos)); ++pos; } @@ -265,7 +263,7 @@ int pickAnnotation(int pos, final int arrayCount, BufferSQLContext context) { case IntTokenHash.REPLICA: context.setAnnotationType(BufferSQLContext.ANNOTATION_REPLICA_NAME); if (hashArray.getHash(++pos)==TokenHash.NAME) { - if (hashArray.getType(++pos) == Tokenizer.EQUAL) { + if (hashArray.getType(++pos) == Tokenizer2.EQUAL) { context.setAnnotationValue(BufferSQLContext.ANNOTATION_REPLICA_NAME, hashArray.getHash(++pos)); ++pos; } @@ -329,9 +327,10 @@ int pickLoad(int pos, final int arrayCount, BufferSQLContext context) { return pos; } - /* - * 用于进行第一遍处理,处理sql类型以及提取表名 - */ + + /** + * 用于进行第一遍处理,处理sql类型以及提取表名 + **/ public void firstParse(BufferSQLContext context) { final int arrayCount = hashArray.getCount(); int pos = 0; @@ -389,7 +388,7 @@ public void firstParse(BufferSQLContext context) { case IntTokenHash.SELECT: if (hashArray.getHash(pos) == TokenHash.SELECT) { context.setSQLType(BufferSQLContext.SELECT_SQL); - pos++; + pos = SelectItemsParser.pickItemList(++pos, arrayCount, hashArray, context); } break; case IntTokenHash.SHOW: @@ -588,6 +587,12 @@ public void firstParse(BufferSQLContext context) { pos = MYCATSQLParser.pickMycat(++pos, arrayCount, context, hashArray, sql); break; } + case IntTokenHash.SHUTDOWN: + if (hashArray.getHash(pos) == TokenHash.SHUTDOWN) { + context.setSQLType(BufferSQLContext.SHUTDOWN_SQL); + pos++; + } + break; default: // debugError(pos, context); pos++; @@ -667,8 +672,8 @@ public static void main(String[] args) { // byte[] defaultByteArray = "/*!MyCAT:DB_Type=Master*/select * from tbl_A where id=1;".getBytes(StandardCharsets.UTF_8); // byte[] defaultByteArray = "insert tbl_A(id, val) values(1, 2);\ninsert tbl_B(id, val) values(2, 2);\nSELECT id, val FROM tbl_S where id=19;\n".getBytes(StandardCharsets.UTF_8); -// ByteArrayInterface src = new DefaultByteArray("/* mycat:balance*/select * into tbl_B from tbl_A;".getBytes()); - ByteArrayInterface src = new DefaultByteArray("select 121345678;".getBytes()); + ByteArrayInterface src = new DefaultByteArray("/* mycat:balance*/select * into tbl_B from tbl_A;".getBytes()); +// ByteArrayInterface src = new DefaultByteArray("select VERSION(), USER(), id from tbl_A;".getBytes()); // ByteArrayInterface src = new DefaultByteArray("select * into tbl_B from tbl_A;".getBytes()); // long min = 0; // for (int i = 0; i < 50; i++) { @@ -682,7 +687,9 @@ public static void main(String[] args) { // System.out.print("min time : " + min); parser.parse(src, context); System.out.println(context.getSQLCount()); - IntStream.range(0, context.getTableCount()).forEach(i -> System.out.println(context.getSchemaName(i) + '.' + context.getTableName(i))); + System.out.println(context.getSelectItem(0)); + System.out.println(context.getSelectItem(1)); + //IntStream.range(0, context.getTableCount()).forEach(i -> System.out.println(context.getSchemaName(i) + '.' + context.getTableName(i))); //System.out.print("token count : "+parser.hashArray.getCount()); } diff --git a/source/src/main/java/io/mycat/mycat2/sqlparser/FunctionHash.java b/source/src/main/java/io/mycat/mycat2/sqlparser/FunctionHash.java new file mode 100644 index 0000000..810b4bb --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/sqlparser/FunctionHash.java @@ -0,0 +1,20 @@ +package io.mycat.mycat2.sqlparser; + +public class FunctionHash { + public static final int USER = 1; + public static final int SCHEMA = 2; + public static final int CHARSET = 3; + public static final int VERSION = 4; + public static final int DATABASE = 5; + public static final int BENCHMARK = 6; + public static final int COLLATION = 7; + public static final int ROW_COUNT = 8; + public static final int FOUND_ROWS = 9; + public static final int SYSTEM_USER = 10; + public static final int COERCIBILITY = 11; + public static final int CURRENT_USER = 12; + public static final int SESSION_USER = 13; + public static final int CONNECTION_ID = 14; + public static final int LAST_INSERT_ID = 15; + +} diff --git a/source/src/main/java/io/mycat/mycat2/sqlparser/IntTokenHash.java b/source/src/main/java/io/mycat/mycat2/sqlparser/IntTokenHash.java index 13421a9..a5b04be 100644 --- a/source/src/main/java/io/mycat/mycat2/sqlparser/IntTokenHash.java +++ b/source/src/main/java/io/mycat/mycat2/sqlparser/IntTokenHash.java @@ -6,17 +6,17 @@ public class IntTokenHash { // generate by DCLSQLParserHelper - public final static int ALL = -1650851837; - public final static int FILE = -860749820; +// public final static int ALL = -1650851837; +// public final static int FILE = -860749820; public final static int PROCESS = 1234436103; - public final static int PROXY = -180420603; - public final static int REFERENCES = -367263734; - public final static int EVENT=-992346107; - public final static int REPLICATION = 272760843; - public final static int SHUTDOWN = 524550152; - public final static int SUPER = -1079377915; - public final static int TRIGGER = 1295319047; - public final static int USAGE = 1964179461; +// public final static int PROXY = -180420603; +// public final static int REFERENCES = -367263734; +// public final static int EVENT=-992346107; +// public final static int REPLICATION = 272760843; +// public final static int SHUTDOWN = 524550152; +// public final static int SUPER = -1079377915; +// public final static int TRIGGER = 1295319047; +// public final static int USAGE = 1964179461; public static final int ANNOTATION_BALANCE = 0x00140001; @@ -25,113 +25,133 @@ public class IntTokenHash { public static final int SQL_DELIMETER = 0x000c0001; //generate by MatchMethodGenerator.GenerateSqlTokenHash - public static final int AS = 0x46580002; - public static final int ID = 0x47670002; - public static final int IF = 0x47650002; - public static final int ON = 0x409e0002; - public static final int OFF = 0xca9a6260; - public static final int FOR = 0x576e0003; - public static final int NOT = 0x68b00003; - public static final int SET = 0x52330003; - public static final int SQL = 0x53820003; - public static final int USE = 0x0f840003; - public static final int XML = 0x7fe20003; - public static final int CALL = 0x25d00004; - public static final int DATA = 0x30b50004; - public static final int DROP = 0xba5d0004; - public static final int DESC = 0xc08f0004; - public static final int FROM = 0x992e0004; - public static final int HELP = 0xd4e30004; - public static final int INTO = 0xb2e70004; - public static final int JOIN = 0xf9700004; - public static final int KILL = 0x5f7f0004; - public static final int LOAD = 0xed8b0004; - public static final int LOCK = 0xed330004; - public static final int LEFT = 0xf4e50004; - public static final int NAME = 0x33200004; - public static final int SHOW = 0xb0300004; - public static final int SLOW = 0xa3340004; - public static final int TIME = 0x5f050004; - public static final int USER = 0x7bfa0004; - public static final int ALTER = 0xb2db0005; - public static final int BEGIN = 0xf8f50005; - public static final int CLEAR = 0x19bf0005; - public static final int CACHE = 0xf61c0005; - public static final int GROUP = 0x43f50005; - public static final int GRANT = 0x29c20005; - public static final int INDEX = 0xfcff0005; - public static final int LIMIT = 0x450a0005; - public static final int LOCAL = 0x7a200005; - public static final int MYCAT = 0xdce50005; - public static final int QUERY = 0xdf8c0005; - public static final int ORDER = 0x88da0005; - public static final int RIGHT = 0xec610005; - public static final int ROUTE = 0x82ef0005; - public static final int START = 0xbf880005; - public static final int TABLE = 0x03780005; - public static final int UNION = 0xcf9d0005; - public static final int WHERE = 0x75950005; - public static final int CATLET = 0xda600006; - public static final int COMMIT = 0xac850006; - public static final int CONFIG = 0xebd60006; - public static final int CREATE = 0xdde00006; - public static final int DETAIL = 0x47690006; - public static final int DELETE = 0x33e50006; - public static final int EXISTS = 0x2ed70006; - public static final int IGNORE = 0x6c360006; - public static final int INFILE = 0xba240006; - public static final int INSERT = 0xa3b10006; - public static final int ONLINE = 0x05bc0006; - public static final int RELOAD = 0xd7200006; - public static final int RENAME = 0xe6c60006; - public static final int REVOKE = 0x5cc20006; - public static final int ROUTER = 0x3c570006; - public static final int SELECT = 0x02a10006; - public static final int SERVER = 0x31fa0006; - public static final int SCHEMA = 0x0c490006; - public static final int SWITCH = 0x85dfe050; - public static final int UPDATE = 0x62840006; - public static final int BACKEND = 0x9c0b0007; - public static final int BALANCE = 0x8d070007; - public static final int CURRENT = 0x6af60007; - public static final int DB_TYPE = 0xac610007; - public static final int DELAYED = 0x56380007; - public static final int EXECUTE = 0x304b0007; - public static final int EXPLAIN = 0xe41f0007; - public static final int OFFLINE = 0x9ead0007; - public static final int REPLACE = 0xa88c0007; - public static final int SESSION = 0xf2080007; - public static final int STARTUP = 0x16770007; - public static final int VERSION = 0x66380007; - public static final int DATABASE = 0xbbcc0008; - public static final int DATANODE = 0x045d0008; - public static final int DESCRIBE = 0x8ad30008; - public static final int ROLLBACK = 0xe7390008; - public static final int TRUNCATE = 0xcb5d0008; - public static final int HEARTBEAT = 0xabb70009; - public static final int PROCEDURE = 0x11600009; - public static final int PROCESSOR = 0xfdbc0009; - public static final int SAVEPOINT = 0xca160009; - public static final int CACHE_TIME = 0x9047000a; - public static final int CONCURRENT = 0x91cf000a; - public static final int CONNECTION = 0x307d000a; - public static final int DATASOURCE = 0x0b35000a; - public static final int THREADPOOL = 0x5a7a000a; - public static final int ACCESS_COUNT = 0xec30000c; - public static final int AUTO_REFRESH = 0x06b8000c; - public static final int LOW_PRIORITY = 0xb805000c; - public static final int HIGH_PRIORITY = 0x7c6f000d; - public static final int RELEASE = 811925511; - public static final int UNLOCK = 1509687302; - public static final int XA = 1097138178; - public static final int END = 1259470851; - public static final int PREPARE = -815464441; - public static final int RECOVER = -405012473; - public static final int REPLICA =-879493113; - public static final int CACHE_RESULT = 2073690123; - - public static final int REPL = 0x32020004; - public static final int REPL_NAME = 0x615d0009; + public static final int AS = 0x46580002; + public static final int ID = 0x47670002; + public static final int IF = 0x47650002; + public static final int ON = 0x409e0002; + public static final int XA = 0x41650002; + public static final int ALL = 0x9d9a0003; + public static final int END = 0x4b120003; + public static final int FOR = 0x576e0003; + public static final int NOT = 0x68b00003; + public static final int OFF = 0x62600003; + public static final int SET = 0x52330003; + public static final int SQL = 0x53820003; + public static final int USE = 0x0f840003; + public static final int XML = 0x7fe20003; + public static final int CALL = 0x25d00004; + public static final int DATA = 0x30b50004; + public static final int DROP = 0xba5d0004; + public static final int DESC = 0xc08f0004; + public static final int FILE = 0xccb20004; + public static final int FROM = 0x992e0004; + public static final int HELP = 0xd4e30004; + public static final int INTO = 0xb2e70004; + public static final int JOIN = 0xf9700004; + public static final int KILL = 0x5f7f0004; + public static final int LOAD = 0xed8b0004; + public static final int LOCK = 0xed330004; + public static final int LEFT = 0xf4e50004; + public static final int NAME = 0x33200004; + public static final int REPL = 0x32020004; + public static final int SHOW = 0xb0300004; + public static final int SLOW = 0xa3340004; + public static final int TIME = 0x5f050004; + public static final int USER = 0x7bfa0004; + public static final int ALTER = 0xb2db0005; + public static final int BEGIN = 0xf8f50005; + public static final int CLEAR = 0x19bf0005; + public static final int CACHE = 0xf61c0005; + public static final int EVENT = 0xc4da0005; + public static final int GROUP = 0x43f50005; + public static final int GRANT = 0x29c20005; + public static final int INDEX = 0xfcff0005; + public static final int LIMIT = 0x450a0005; + public static final int LOCAL = 0x7a200005; + public static final int MYCAT = 0xdce50005; + public static final int ORDER = 0x88da0005; + public static final int PROXY = 0xf53f0005; + public static final int QUERY = 0xdf8c0005; + public static final int RIGHT = 0xec610005; + public static final int ROUTE = 0x82ef0005; + public static final int START = 0xbf880005; + public static final int SUPER = 0xbfaa0005; + public static final int TABLE = 0x03780005; + public static final int UNION = 0xcf9d0005; + public static final int USAGE = 0x75130005; + public static final int WHERE = 0x75950005; + public static final int CATLET = 0xda600006; + public static final int COMMIT = 0xac850006; + public static final int CONFIG = 0xebd60006; + public static final int CREATE = 0xdde00006; + public static final int DETAIL = 0x47690006; + public static final int DELETE = 0x33e50006; + public static final int EXISTS = 0x2ed70006; + public static final int IGNORE = 0x6c360006; + public static final int INFILE = 0xba240006; + public static final int INSERT = 0xa3b10006; + public static final int ONLINE = 0x05bc0006; + public static final int RELOAD = 0xd7200006; + public static final int RENAME = 0xe6c60006; + public static final int REVOKE = 0x5cc20006; + public static final int ROUTER = 0x3c570006; + public static final int SELECT = 0x02a10006; + public static final int SERVER = 0x31fa0006; + public static final int SCHEMA = 0x0c490006; + public static final int SWITCH = 0xe0500006; + public static final int UNLOCK = 0x59fc0006; + public static final int UPDATE = 0x62840006; + public static final int BACKEND = 0x9c0b0007; + public static final int BALANCE = 0x8d070007; + public static final int CHARSET = 0x5b1e0007; + public static final int CURRENT = 0x6af60007; + public static final int DB_TYPE = 0xac610007; + public static final int DELAYED = 0x56380007; + public static final int EXECUTE = 0x304b0007; + public static final int EXPLAIN = 0xe41f0007; + public static final int OFFLINE = 0x9ead0007; + public static final int PREPARE = 0xcf650007; + public static final int RECOVER = 0xe7dc0007; + public static final int RELEASE = 0x30650007; + public static final int REPLACE = 0xa88c0007; + public static final int REPLICA = 0xcb940007; + public static final int SESSION = 0xf2080007; + public static final int STARTUP = 0x16770007; + public static final int TRIGGER = 0x4d350007; + public static final int VERSION = 0x66380007; + public static final int DATABASE = 0xbbcc0008; + public static final int DATANODE = 0x045d0008; + public static final int DESCRIBE = 0x8ad30008; + public static final int ROLLBACK = 0xe7390008; + public static final int SHUTDOWN = 0x1f440008; + public static final int TRUNCATE = 0xcb5d0008; + public static final int BENCHMARK = 0xc8b80009; + public static final int COLLATION = 0x1a6e0009; + public static final int HEARTBEAT = 0xabb70009; + public static final int PROCEDURE = 0x11600009; + public static final int PROCESSOR = 0xfdbc0009; + public static final int REPL_NAME = 0x615d0009; + public static final int ROW_COUNT = 0xdfe30009; + public static final int SAVEPOINT = 0xca160009; + public static final int CACHE_TIME = 0x9047000a; + public static final int CONCURRENT = 0x91cf000a; + public static final int CONNECTION = 0x307d000a; + public static final int DATASOURCE = 0x0b35000a; + public static final int FOUND_ROWS = 0xeeb4000a; + public static final int REFERENCES = 0xea1c000a; + public static final int THREADPOOL = 0x5a7a000a; + public static final int REPLICATION = 0x1042000b; + public static final int SYSTEM_USER = 0x011e000b; + public static final int ACCESS_COUNT = 0xec30000c; + public static final int AUTO_REFRESH = 0x06b8000c; + public static final int CACHE_RESULT = 0xc45b000c; + public static final int COERCIBILITY = 0x247c000c; + public static final int CURRENT_USER = 0x4971000c; + public static final int LOW_PRIORITY = 0xb805000c; + public static final int SESSION_USER = 0x1b9b000c; + public static final int CONNECTION_ID = 0x1b85000d; + public static final int HIGH_PRIORITY = 0x7c6f000d; + public static final int LAST_INSERT_ID = 0x0737000e; public static final int REPL_METABEAN_INDEX = 0xa7b40013; } diff --git a/source/src/main/java/io/mycat/mycat2/sqlparser/MatchMethodGenerator.java b/source/src/main/java/io/mycat/mycat2/sqlparser/MatchMethodGenerator.java index 7cf7ebc..f8b42dc 100644 --- a/source/src/main/java/io/mycat/mycat2/sqlparser/MatchMethodGenerator.java +++ b/source/src/main/java/io/mycat/mycat2/sqlparser/MatchMethodGenerator.java @@ -367,8 +367,8 @@ public static void main(String[] args) { // sqlKeyHastTest("minimal_sql_tokens.txt", s -> genHash(s.toCharArray()), 0x3FL); // run(); // test1(); - GenerateIntTokenHash("minimal_sql_tokens.txt"); -// GenerateLongTokenHash("sql_tokens.txt"); +// GenerateIntTokenHash("minimal_sql_tokens.txt"); + GenerateLongTokenHash("sql_tokens.txt"); // initShrinkCharTbl(); // System.out.format("0x%xL;%n", genHash("dn1".toCharArray())); diff --git a/source/src/main/java/io/mycat/mycat2/sqlparser/TokenHash.java b/source/src/main/java/io/mycat/mycat2/sqlparser/TokenHash.java index a1f30d7..970edc2 100644 --- a/source/src/main/java/io/mycat/mycat2/sqlparser/TokenHash.java +++ b/source/src/main/java/io/mycat/mycat2/sqlparser/TokenHash.java @@ -27,6 +27,7 @@ public class TokenHash { public static final long ASENSITIVE = 0xed1d69ad76119L; public static final long AT = 0x20bL; public static final long AUTHORS = 0xe27d1ca23L; + public static final long AUTOCOMMIT = 0xee305b99db768L; public static final long AUTOEXTEND_SIZE = 0x464aad1de667b2c4L; public static final long AUTO_INCREMENT = 0xe297db9809eea053L; public static final long AUTO_REFRESH = 0x61c0bcdb33d9ba87L; @@ -37,6 +38,7 @@ public class TokenHash { public static final long BALANCE = 0xeb775c883L; public static final long BEFORE = 0x5c8a2cfdL; public static final long BEGIN = 0x241d4bcL; + public static final long BENCHMARK = 0x615a508c3b76L; public static final long BETWEEN = 0xed484eed7L; public static final long BIGINT = 0x5d378abfL; public static final long BINARY = 0x5d3eb36fL; @@ -56,6 +58,7 @@ public class TokenHash { public static final long BYTE = 0xe9d50L; public static final long CACHE = 0x268a383L; public static final long CACHE_TIME = 0x10a23d92583364L; + public static final long CACHE_RESULT = 0x6d39564de6b410c2L; public static final long CALL = 0xf0bb0L; public static final long CASCADE = 0xfd3c63759L; public static final long CASCADED = 0x288eabedd50L; @@ -79,6 +82,7 @@ public class TokenHash { public static final long CLOSE = 0x2748585L; public static final long COALESCE = 0x297ebc66617L; public static final long CODE = 0xf664fL; + public static final long COERCIBILITY = 0x6fd78fb903e0bc4eL; public static final long COLLATE = 0x10334f6c01L; public static final long COLLATION = 0x6a60ec845118L; public static final long COLUMNS = 0x10335930e6L; @@ -95,7 +99,9 @@ public class TokenHash { public static final long CONCURRENT = 0x1109d7275db039L; public static final long CONDITION = 0x6a62ece0553aL; public static final long CONFIG = 0x65295ad0L; + public static final long CONFIGS = 0x10339f8b6eL; public static final long CONNECTION = 0x1109e2e0c5262eL; + public static final long CONNECTION_ID = 0xeb2e99b33504d307L; public static final long CONSISTENT = 0x1109e886ba6c70L; public static final long CONSTRAINT = 0x1109e8d271817bL; public static final long CONSTRAINT_CATALOG = 0x2160d58342133e79L; @@ -219,6 +225,7 @@ public class TokenHash { public static final long FOREIGN = 0x1385ad7ac7L; public static final long FORMAT = 0x79e5364bL; public static final long FOUND = 0x2f92e2bL; + public static final long FOUND_ROWS = 0x148861eb6e43e8L; public static final long FRAC_SECOND = 0x34cc1d80f5a547cL; public static final long FROM = 0x12a378L; public static final long FULL = 0x12b6afL; @@ -251,6 +258,7 @@ public class TokenHash { public static final long ID = 0x343L; public static final long IDENTIFIED = 0x17b2ebf9748803L; public static final long IF = 0x345L; + public static final long IFNULL = 0x8d14a25cL; public static final long IGNORE = 0x8d3f9a06L; public static final long IGNORE_SERVER_IDS = 0x458aa01d4cb6f1bcL; public static final long IMPORT = 0x8e44694dL; @@ -288,6 +296,7 @@ public class TokenHash { public static final long KILL = 0x17aa00L; public static final long LANGUAGE = 0x420e34e9f9cL; public static final long LAST = 0x1883d8L; + public static final long LAST_INSERT_ID = 0xb7d9f84aaa41c6eL; public static final long LEADING = 0x19e08635c1L; public static final long LEAVES = 0xa19357aaL; public static final long LEAVE = 0x3f0dcacL; @@ -396,12 +405,13 @@ public class TokenHash { public static final long NO_WAIT = 0x1c604af94aL; public static final long NO_WRITE_TO_BINLOG = 0xd8e4fba760df3705L; public static final long NULL = 0x1b2077L; + public static final long NULLIF = 0xb22030acL; public static final long NUMBER = 0xb220fba3L; public static final long NUMERIC = 0x1c874bc8c0L; public static final long NVARCHAR = 0x4927586d095L; + public static final long OFF = 0xad84L; public static final long OFFLINE = 0x1d39b08d88L; public static final long OFFSET = 0xb67b1e11L; - public static final long OFF = 0xad84L; public static final long OLD_PASSWORD = 0xcaed8a26ad23f3f9L; public static final long ON = 0x443L; public static final long ONE_SHOT = 0x4b711f6af83L; @@ -480,7 +490,11 @@ public class TokenHash { public static final long REPAIR = 0xcb118875L; public static final long REPEATABLE = 0x2233dc5c8f419bL; public static final long REPEAT = 0xcb11a173L; + public static final long REPL = 0x1eecefL; + public static final long REPL_NAME = 0xd58ef244a352L; + public static final long REPL_METABEAN_INDEX = 0xa7d557d5265cd8ffL; public static final long REPLACE = 0x2085da3551L; + public static final long REPLICA = 0x2085da69d5L; public static final long REPLICATION = 0x57a4f90a076bbf3L; public static final long REPLICATE_DO_DB = 0x676228600fed6775L; public static final long REPLICATE_IGNORE_DB = 0x9c8545f719859952L; @@ -592,8 +606,8 @@ public class TokenHash { public static final long SUPER = 0x52fe2f6L; public static final long SUSPEND = 0x220ffdebdfL; public static final long SWAPS = 0x5319cadL; - public static final long SWITCHES = 0x57681886a32L; public static final long SWITCH = 0xd4fa9944L; + public static final long SWITCHES = 0x57681886a32L; public static final long SYSDATE = 0x222b90810aL; public static final long SYSTEM_USER = 0x5c159a1fd978d40L; public static final long TABLES = 0xd82644fdL; @@ -662,6 +676,7 @@ public class TokenHash { public static final long VARYING = 0x24d7226165L; public static final long VAR_POP = 0x24d724aa00L; public static final long VAR_SAMP = 0x5e674e2055dL; + public static final long VERSION = 0x24f2bb5f63L; public static final long VIEW = 0x233a5fL; public static final long VIRTUAL = 0x250e5c064cL; public static final long WAIT = 0x2413b1L; @@ -700,10 +715,6 @@ public class TokenHash { public static final long SQL_TSI_MONTH = 0x8c3f094f1b906a03L; public static final long SQL_TSI_QUARTER = 0xe9ec207893081b87L; public static final long SQL_TSI_YEAR = 0xea71ed7ecec4f35bL; - public static final long AUTOCOMMIT = 4190263402411880L; - public static final long IFNULL=2366939740L; - public static final long NULLIF=2988454060L; - public static final long REPL = 0x1eecefL; public static void main(String[] args) { MatchMethodGenerator.initShrinkCharTbl(); diff --git a/source/src/main/java/io/mycat/mycat2/sqlparser/byteArrayInterface/SelectItemsParser.java b/source/src/main/java/io/mycat/mycat2/sqlparser/byteArrayInterface/SelectItemsParser.java new file mode 100644 index 0000000..85d060b --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/sqlparser/byteArrayInterface/SelectItemsParser.java @@ -0,0 +1,170 @@ +package io.mycat.mycat2.sqlparser.byteArrayInterface; + +import io.mycat.mycat2.sqlparser.BufferSQLContext; +import io.mycat.mycat2.sqlparser.FunctionHash; +import io.mycat.mycat2.sqlparser.IntTokenHash; +import io.mycat.mycat2.sqlparser.TokenHash; +import io.mycat.mycat2.sqlparser.SQLParseUtils.HashArray; + +public class SelectItemsParser { + + private static boolean isFunction(int pos, HashArray hashArray) { + return hashArray.getType(pos) == Tokenizer2.LEFT_PARENTHESES; + } + + private static boolean isListFinish(int intHash, long hash) { + switch (intHash) { + case IntTokenHash.SQL_DELIMETER: + return true; + case IntTokenHash.FROM: + if (hash == TokenHash.FROM) + return true; + break; + case IntTokenHash.ORDER: + if (hash == TokenHash.ORDER) + return true; + break; + case IntTokenHash.INTO: + if (hash == TokenHash.INTO) + return true; + break; + default: + return false; + } + return false; + } + + public static int pickItemList(int pos, final int arrayCount, HashArray hashArray, BufferSQLContext context) { + int intHash; + int size; + while (pos < arrayCount) { + intHash = hashArray.getIntHash(pos); + size = intHash & 0xFFFF; + + switch (size) { + case 4: + if (intHash == IntTokenHash.USER && hashArray.getHash(pos) == TokenHash.USER && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.USER); + pos+=2; + } else if (isListFinish(intHash, hashArray.getHash(pos))) { + return pos; + } else { + pos++; + } + break; + case 6: + if (intHash == IntTokenHash.SCHEMA && hashArray.getHash(pos) == TokenHash.SCHEMA && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.SCHEMA); + pos+=2; + } else if (isListFinish(intHash, hashArray.getHash(pos))) { + return pos; + } else { + pos++; + } + break; + case 7: + if (intHash == IntTokenHash.CHARSET && hashArray.getHash(pos) == TokenHash.CHARSET && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.CHARSET); + pos+=2; + } else if (intHash == IntTokenHash.VERSION && hashArray.getHash(pos) == TokenHash.VERSION && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.VERSION); + pos+=2; + } else if (isListFinish(intHash, hashArray.getHash(pos))) { + return pos; + } else { + pos++; + } + break; + case 8: + if (intHash == IntTokenHash.DATABASE && hashArray.getHash(pos) == TokenHash.DATABASE && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.DATABASE); + pos+=2; + } else if (isListFinish(intHash, hashArray.getHash(pos))) { + return pos; + } else { + pos++; + } + break; + case 9: + if (intHash == IntTokenHash.BENCHMARK && hashArray.getHash(pos) == TokenHash.BENCHMARK && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.BENCHMARK); + pos+=2; + } else if (intHash == IntTokenHash.COLLATION && hashArray.getHash(pos) == TokenHash.COLLATION && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.COLLATION); + pos+=2; + } else if (intHash == IntTokenHash.ROW_COUNT && hashArray.getHash(pos) == TokenHash.ROW_COUNT && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.ROW_COUNT); + pos+=2; + } else if (isListFinish(intHash, hashArray.getHash(pos))) { + return pos; + } else { + pos++; + } + break; + case 10: + if (intHash == IntTokenHash.FOUND_ROWS && hashArray.getHash(pos) == TokenHash.FOUND_ROWS && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.FOUND_ROWS); + pos+=2; + } else if (isListFinish(intHash, hashArray.getHash(pos))) { + return pos; + } else { + pos++; + } + break; + case 11: + if (intHash == IntTokenHash.SYSTEM_USER && hashArray.getHash(pos) == TokenHash.SYSTEM_USER && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.SYSTEM_USER); + pos+=2; + } else if (isListFinish(intHash, hashArray.getHash(pos))) { + return pos; + } else { + pos++; + } + break; + case 12: + if (intHash == IntTokenHash.COERCIBILITY && hashArray.getHash(pos) == TokenHash.COERCIBILITY && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.COERCIBILITY); + pos+=2; + } else if (intHash == IntTokenHash.CURRENT_USER && hashArray.getHash(pos) == TokenHash.CURRENT_USER && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.CURRENT_USER); + pos+=2; + } else if (intHash == IntTokenHash.SESSION_USER && hashArray.getHash(pos) == TokenHash.SESSION_USER && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.SESSION_USER); + pos+=2; + } else if (isListFinish(intHash, hashArray.getHash(pos))) { + return pos; + } else { + pos++; + } + break; + case 13: + if (intHash == IntTokenHash.CONNECTION_ID && hashArray.getHash(pos) == TokenHash.CONNECTION_ID && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.CONNECTION_ID); + pos+=2; + } else if (isListFinish(intHash, hashArray.getHash(pos))) { + return pos; + } else { + pos++; + } + break; + case 14: + if (intHash == IntTokenHash.LAST_INSERT_ID && hashArray.getHash(pos) == TokenHash.LAST_INSERT_ID && isFunction(++pos, hashArray)) { + context.setSelectItem(FunctionHash.LAST_INSERT_ID); + pos+=2; + } else if (isListFinish(intHash, hashArray.getHash(pos))) { + return pos; + } else { + pos++; + } + break; + default: + if (isListFinish(intHash, hashArray.getHash(pos))) { + return pos; + } else { + pos++; + } + } + } + return pos; + } +} diff --git a/source/src/main/java/io/mycat/mycat2/sqlparser/byteArrayInterface/dynamicAnnotation/impl/DynamicAnnotationUtil.java b/source/src/main/java/io/mycat/mycat2/sqlparser/byteArrayInterface/dynamicAnnotation/impl/DynamicAnnotationUtil.java index bb8a3f9..9609cf6 100644 --- a/source/src/main/java/io/mycat/mycat2/sqlparser/byteArrayInterface/dynamicAnnotation/impl/DynamicAnnotationUtil.java +++ b/source/src/main/java/io/mycat/mycat2/sqlparser/byteArrayInterface/dynamicAnnotation/impl/DynamicAnnotationUtil.java @@ -14,7 +14,6 @@ import java.io.File; import java.io.FileWriter; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -30,13 +29,37 @@ * Created by jamie on 2017/9/13. */ public class DynamicAnnotationUtil { - static final DynamicClassLoader classLoader; - public static final AtomicInteger count=new AtomicInteger(); + private static final DynamicClassLoader classLoader; + private static final JavaCompiler javac; + public static final AtomicInteger count = new AtomicInteger(); private static final Logger logger = LoggerFactory.getLogger(DynamicAnnotationUtil.class); static { classLoader = new DynamicClassLoader("", Thread.currentThread().getContextClassLoader()); + // Issue: the tools.jar exists in ${JAVA_HOME}/lib, but ToolProvider finds it in ${java.home}/lib. + //When ${java.home} isn't same as ${JAVA_HOME} such as eclipse default, no JavaCompiler found! + // Solution: changing ${java.home} to ${JAVA_HOME} temporally. + // @since 2017-10-14 little-pan + final JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + if(compiler == null) { + final String envJavaHome = System.getenv("JAVA_HOME"); + if(envJavaHome == null) { + throw new ExceptionInInitializerError("The JAVA_HOME environment variable not configured"); + } + final String propJavaHome= System.getProperty("java.home"); + try { + System.setProperty("java.home", envJavaHome); + javac = ToolProvider.getSystemJavaCompiler(); + }finally { + System.setProperty("java.home", propJavaHome); + } + if(javac == null) { + throw new ExceptionInInitializerError("The tools.jar not found in ${JAVA_HOME}/lib or ${java.home}/lib"); + } + }else { + javac = compiler; + } } public static DynamicAnnotationRuntime compile(String matchName,Map> lines) throws Exception { @@ -79,7 +102,7 @@ public static DynamicAnnotationMatch loadClass(DynamicAnnotationRuntime runtime) } public static void compileTheJavaSrcFile(File... srcFiles) { - JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + final JavaCompiler compiler = javac; try (StandardJavaFileManager fileMgr = compiler.getStandardFileManager(null, null, null)) { JavaCompiler.CompilationTask t = compiler.getTask(null, fileMgr, null, null, null, fileMgr.getJavaFileObjects(srcFiles)); t.call(); diff --git a/source/src/main/java/io/mycat/mycat2/sqlparser/byteArrayInterface/mycat/MYCATSQLParser.java b/source/src/main/java/io/mycat/mycat2/sqlparser/byteArrayInterface/mycat/MYCATSQLParser.java index 8096c4c..cf24029 100644 --- a/source/src/main/java/io/mycat/mycat2/sqlparser/byteArrayInterface/mycat/MYCATSQLParser.java +++ b/source/src/main/java/io/mycat/mycat2/sqlparser/byteArrayInterface/mycat/MYCATSQLParser.java @@ -13,32 +13,44 @@ * Created by yanjunli on 2017/9/26. */ public class MYCATSQLParser { - - public static int pickMycat(int pos, final int arrayCount, BufferSQLContext context, HashArray hashArray, ByteArrayInterface sql){ long longHash = hashArray.getHash(pos); if (TokenHash.SWITCH == longHash) { TokenizerUtil.debug(pos, context); return pickSwitch(++pos, arrayCount, context, hashArray, sql); - } else { + } else if (TokenHash.SHOW == longHash) { + TokenizerUtil.debug(pos, context); + return pickShow(++pos, arrayCount, context, hashArray, sql); + } else { throw new InvalidParameterException(" the current mycat command is not support!!"); } } - public static int pickSwitch(int pos, final int arrayCount, BufferSQLContext context, HashArray hashArray, ByteArrayInterface sql){ + private static int pickSwitch(int pos, final int arrayCount, BufferSQLContext context, HashArray hashArray, ByteArrayInterface sql){ long longHash = hashArray.getHash(pos); - if(TokenHash.REPL == longHash){ + if (TokenHash.REPL == longHash){ context.setSQLType(BufferSQLContext.MYCAT_SWITCH_REPL); TokenizerUtil.debug(pos, context); return pickSwitchRepl(pos, arrayCount, context, hashArray, sql); - }else{ + } else { throw new InvalidParameterException(" the current mycat command is not support!!"); } } - - public static int pickSwitchRepl(int pos, final int arrayCount, BufferSQLContext context, HashArray hashArray, ByteArrayInterface sql){ + + private static int pickSwitchRepl(int pos, final int arrayCount, BufferSQLContext context, HashArray hashArray, ByteArrayInterface sql){ context.getMyCmdValue().set(IntTokenHash.REPL_NAME, hashArray.getPos(++pos), hashArray.getSize(pos), hashArray.getHash(pos)); context.getMyCmdValue().set(IntTokenHash.REPL_METABEAN_INDEX, hashArray.getPos(++pos), hashArray.getSize(pos), hashArray.getHash(pos)); return pos; } + + private static int pickShow(int pos, final int arrayCount, BufferSQLContext context, HashArray hashArray, ByteArrayInterface sql){ + long longHash = hashArray.getHash(pos); + if (TokenHash.CONFIGS == longHash){ + context.setSQLType(BufferSQLContext.MYCAT_SHOW_CONFIGS); + TokenizerUtil.debug(pos, context); + return pos; + } else { + throw new InvalidParameterException(" the current mycat command is not support!!"); + } + } } diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java index 9be8403..a482bf9 100644 --- a/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java @@ -20,6 +20,7 @@ import io.mycat.mysql.packet.HandshakePacket; import io.mycat.mysql.packet.MySQLPacket; import io.mycat.proxy.BufferPool; +import io.mycat.util.ErrorCode; import io.mycat.util.ParseUtil; import io.mycat.util.SecurityUtil; @@ -61,6 +62,16 @@ public void onSocketRead(MySQLSession session) throws IOException { session.curMSQLPackgInf, false)) {// 没有读到数据或者报文不完整 return; } + + if(MySQLPacket.ERROR_PACKET == session.curMSQLPackgInf.pkgType){ + errPkg = new ErrorPacket(); + errPkg.packetId = session.proxyBuffer.getByte(session.curMSQLPackgInf.startPos + + ParseUtil.mysql_packetHeader_length); + errPkg.read(session.proxyBuffer); + logger.warn("backend authed failed. Err No. " + errPkg.errno + "," + errPkg.message); + this.finished(false); + return; + } if (!welcomePkgReceived) { handshake = new HandshakePacket(); @@ -100,13 +111,6 @@ public void onSocketRead(MySQLSession session) throws IOException { if (session.curMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) { logger.debug("backend authed suceess "); this.finished(true); - } else if (session.curMSQLPackgInf.pkgType == MySQLPacket.ERROR_PACKET) { - errPkg = new ErrorPacket(); - errPkg.packetId = session.proxyBuffer.getByte(session.curMSQLPackgInf.startPos - + ParseUtil.mysql_packetHeader_length); - errPkg.read(session.proxyBuffer); - logger.warn("backend authed failed. Err No. " + errPkg.errno + "," + errPkg.message); - this.finished(false); } } } @@ -125,9 +129,10 @@ public void onConnect(SelectionKey theKey, MySQLSession userSession, boolean suc } else { errPkg = new ErrorPacket(); + errPkg.packetId = 1; + errPkg.errno = ErrorCode.ERR_CONNECT_SOCKET; errPkg.message = logInfo; finished(false); - } } diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendHeartbeatTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendHeartbeatTask.java index 1591ccf..b59f2ed 100644 --- a/source/src/main/java/io/mycat/mycat2/tasks/BackendHeartbeatTask.java +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendHeartbeatTask.java @@ -134,7 +134,6 @@ void onRsFinish(MySQLSession session,boolean success,String msg) { reactor.addMySQLSession(metaBean, session); switch(repBean.getReplicaBean().getRepType()){ - case GROUP_REPLICATION: case MASTER_SLAVE: masterSlaveHeartbeat(); break; diff --git a/source/src/main/java/io/mycat/mysql/packet/FieldPacket.java b/source/src/main/java/io/mycat/mysql/packet/FieldPacket.java new file mode 100644 index 0000000..3c95109 --- /dev/null +++ b/source/src/main/java/io/mycat/mysql/packet/FieldPacket.java @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software;Designed and Developed mainly by many Chinese + * opensource volunteers. you can redistribute it and/or modify it under the + * terms of the GNU General Public License version 2 only, as published by the + * Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Any questions about this component can be directed to it's project Web address + * https://code.google.com/p/opencloudb/. + * + */ +package io.mycat.mysql.packet; + +import io.mycat.proxy.ProxyBuffer; +import io.mycat.util.BufferUtil; + +/** + * From Server To Client, part of Result Set Packets. One for each column in the + * result set. Thus, if the value of field_columns in the Result Set Header + * Packet is 3, then the Field Packet occurs 3 times. + * + *
+ * Bytes                      Name
+ * -----                      ----
+ * n (Length Coded String)    catalog
+ * n (Length Coded String)    db
+ * n (Length Coded String)    table
+ * n (Length Coded String)    org_table
+ * n (Length Coded String)    name
+ * n (Length Coded String)    org_name
+ * 1                          (filler)
+ * 2                          charsetNumber
+ * 4                          length
+ * 1                          type
+ * 2                          flags
+ * 1                          decimals
+ * 2                          (filler), always 0x00
+ * n (Length Coded Binary)    default
+ * 
+ * @see http://forge.mysql.com/wiki/MySQL_Internals_ClientServer_Protocol#Field_Packet
+ * 
+ * + * @author mycat + */ +public class FieldPacket extends MySQLPacket { + private static final byte[] DEFAULT_CATALOG = "def".getBytes(); + private static final byte[] FILLER = new byte[2]; + + public byte[] catalog = DEFAULT_CATALOG; + public byte[] db; + public byte[] table; + public byte[] orgTable; + public byte[] name; + public byte[] orgName; + public int charsetIndex; + public long length; + public int type; + public int flags; + public byte decimals; + public byte[] definition; + + /** + * 把字节数组转变成FieldPacket + */ + public void read(byte[] data) { + MySQLMessage mm = new MySQLMessage(data); + this.packetLength = mm.readUB3(); + this.packetId = mm.read(); + readBody(mm); + } + + @Override + public void write(ProxyBuffer buffer) { + buffer.writeFixInt(3, calcPacketSize()); + buffer.writeByte(packetId); + writeBody(buffer); + } + + @Override + public int calcPacketSize() { + int size = (catalog == null ? 1 : BufferUtil.getLength(catalog)); + size += (db == null ? 1 : BufferUtil.getLength(db)); + size += (table == null ? 1 : BufferUtil.getLength(table)); + size += (orgTable == null ? 1 : BufferUtil.getLength(orgTable)); + size += (name == null ? 1 : BufferUtil.getLength(name)); + size += (orgName == null ? 1 : BufferUtil.getLength(orgName)); + size += 13;// 1+2+4+1+2+1+2 + if (definition != null) { + size += BufferUtil.getLength(definition); + } + return size; + } + + @Override + protected String getPacketInfo() { + return "MySQL Field Packet"; + } + + private void readBody(MySQLMessage mm) { + this.catalog = mm.readBytesWithLength(); + this.db = mm.readBytesWithLength(); + this.table = mm.readBytesWithLength(); + this.orgTable = mm.readBytesWithLength(); + this.name = mm.readBytesWithLength(); + this.orgName = mm.readBytesWithLength(); + mm.move(1); + this.charsetIndex = mm.readUB2(); + this.length = mm.readUB4(); + this.type = mm.read() & 0xff; + this.flags = mm.readUB2(); + this.decimals = mm.read(); + mm.move(FILLER.length); + if (mm.hasRemaining()) { + this.definition = mm.readBytesWithLength(); + } + } + + private void writeBody(ProxyBuffer buffer) { + writeBytesWithNullable(buffer, catalog); + writeBytesWithNullable(buffer, db); + writeBytesWithNullable(buffer, table); + writeBytesWithNullable(buffer, orgTable); + writeBytesWithNullable(buffer, name); + writeBytesWithNullable(buffer, orgName); + buffer.writeByte((byte) 0x0C); + buffer.writeFixInt(2, charsetIndex); + buffer.writeFixInt(4, length); + buffer.writeByte((byte) (type & 0xff)); + buffer.writeFixInt(2, flags); + buffer.writeByte(decimals); + buffer.writeByte((byte)0x00); + buffer.writeByte((byte)0x00); + //buffer.position(buffer.position() + FILLER.length); + if (definition != null) { + writeBytesWithNullable(buffer, definition); + } + } + + private void writeBytesWithNullable(ProxyBuffer buffer, byte[] bytes) { + byte nullVal = 0; + if (bytes == null) { + buffer.writeByte(nullVal); + } else { + buffer.writeLenencBytes(bytes); + } + } +} \ No newline at end of file diff --git a/source/src/main/java/io/mycat/mysql/packet/ResultSetHeaderPacket.java b/source/src/main/java/io/mycat/mysql/packet/ResultSetHeaderPacket.java new file mode 100644 index 0000000..12ce97f --- /dev/null +++ b/source/src/main/java/io/mycat/mysql/packet/ResultSetHeaderPacket.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software;Designed and Developed mainly by many Chinese + * opensource volunteers. you can redistribute it and/or modify it under the + * terms of the GNU General Public License version 2 only, as published by the + * Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Any questions about this component can be directed to it's project Web address + * https://code.google.com/p/opencloudb/. + * + */ +package io.mycat.mysql.packet; + +import io.mycat.proxy.ProxyBuffer; +import io.mycat.util.BufferUtil; + +/** + * From server to client after command, if no error and result set -- that is, + * if the command was a query which returned a result set. The Result Set Header + * Packet is the first of several, possibly many, packets that the server sends + * for result sets. The order of packets for a result set is: + * + *
+ * (Result Set Header Packet)   the number of columns
+ * (Field Packets)              column descriptors
+ * (EOF Packet)                 marker: end of Field Packets
+ * (Row Data Packets)           row contents
+ * (EOF Packet)                 marker: end of Data Packets
+ * 
+ * Bytes                        Name
+ * -----                        ----
+ * 1-9   (Length-Coded-Binary)  field_count
+ * 1-9   (Length-Coded-Binary)  extra
+ * 
+ * @see http://forge.mysql.com/wiki/MySQL_Internals_ClientServer_Protocol#Result_Set_Header_Packet
+ * 
+ * + * @author mycat + */ +public class ResultSetHeaderPacket extends MySQLPacket { + public int fieldCount; + public long extra; + + public void read(byte[] data) { + MySQLMessage mm = new MySQLMessage(data); + this.packetLength = mm.readUB3(); + this.packetId = mm.read(); + this.fieldCount = (int) mm.readLength(); + if (mm.hasRemaining()) { + this.extra = mm.readLength(); + } + } + + @Override + public void write(ProxyBuffer buffer) { + buffer.writeFixInt(3, calcPacketSize()); + buffer.writeByte(packetId); + buffer.writeFixInt(1, fieldCount); + if (extra > 0) { + buffer.writeLenencInt(extra); + } + } + + @Override + public int calcPacketSize() { + int size = BufferUtil.getLength(fieldCount); + if (extra > 0) { + size += BufferUtil.getLength(extra); + } + return size; + } + + @Override + protected String getPacketInfo() { + return "MySQL ResultSetHeader Packet"; + } +} \ No newline at end of file diff --git a/source/src/main/java/io/mycat/mysql/packet/RowDataPacket.java b/source/src/main/java/io/mycat/mysql/packet/RowDataPacket.java new file mode 100644 index 0000000..b1530bf --- /dev/null +++ b/source/src/main/java/io/mycat/mysql/packet/RowDataPacket.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. DO NOT ALTER OR + * REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software;Designed and Developed mainly by many Chinese opensource volunteers. + * you can redistribute it and/or modify it under the terms of the GNU General Public License + * version 2 only, as published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without + * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version 2 along with this work; + * if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA. + * + * Any questions about this component can be directed to it's project Web address + * https://code.google.com/p/opencloudb/. + * + */ +package io.mycat.mysql.packet; + +import io.mycat.proxy.ProxyBuffer; +import io.mycat.util.BufferUtil; + +import java.util.ArrayList; +import java.util.List; + +/** + * From server to client. One packet for each row in the result set. + *

+ * + *

+ * Bytes                   Name
+ * -----                   ----
+ * n (Length Coded String) (column value)
+ * ...
+ *
+ * (column value):         The data in the column, as a character string.
+ *                         If a column is defined as non-character, the
+ *                         server converts the value into a character
+ *                         before sending it. Since the value is a Length
+ *                         Coded String, a NULL can be represented with a
+ *                         single byte containing 251(see the description
+ *                         of Length Coded Strings in section "Elements" above).
+ *
+ * @see http://forge.mysql.com/wiki/MySQL_Internals_ClientServer_Protocol#Row_Data_Packet
+ * 
+ * + * @author mycat + */ +public class RowDataPacket extends MySQLPacket { + private static final byte NULL_MARK = (byte) 251; + private static final byte EMPTY_MARK = (byte) 0; + public int fieldCount; + public final List fieldValues; + + public RowDataPacket(int fieldCount) { + this.fieldCount = fieldCount; + this.fieldValues = new ArrayList(fieldCount); + } + + public void add(byte[] value) { + // 这里应该修改value + fieldValues.add(value); + } + + public void addFieldCount(int add) { + // 这里应该修改field + fieldCount = fieldCount + add; + } + + public void read(byte[] data) { + MySQLMessage mm = new MySQLMessage(data); + packetLength = mm.readUB3(); + packetId = mm.read(); + for (int i = 0; i < fieldCount; i++) { + fieldValues.add(mm.readBytesWithLength()); + } + } + + @Override + public void write(ProxyBuffer buffer) { + buffer.writeFixInt(3, calcPacketSize()); + buffer.writeByte(packetId); + for (int i = 0; i < fieldCount; i++) { + byte[] fv = fieldValues.get(i); + if (fv == null) { + buffer.writeByte(RowDataPacket.NULL_MARK); + } else if (fv.length == 0) { + buffer.writeByte(RowDataPacket.EMPTY_MARK); + } else { + buffer.writeLenencBytes(fv); + } + } + } + + @Override + public int calcPacketSize() { + int size = 0; + for (int i = 0; i < fieldCount; i++) { + byte[] v = fieldValues.get(i); + size += (v == null || v.length == 0) ? 1 : BufferUtil.getLength(v); + } + return size; + } + + @Override + protected String getPacketInfo() { + return "MySQL RowData Packet"; + } +} diff --git a/source/src/main/java/io/mycat/proxy/AbstractSession.java b/source/src/main/java/io/mycat/proxy/AbstractSession.java index 458bf05..995e5d5 100644 --- a/source/src/main/java/io/mycat/proxy/AbstractSession.java +++ b/source/src/main/java/io/mycat/proxy/AbstractSession.java @@ -142,11 +142,6 @@ public boolean readFromChannel() throws IOException { proxyBuffer.writeIndex = buffer.position(); return readed > 0; } - - private void closeSocket(boolean normal,String msg) throws IOException{ - close(false,msg); - throw new ClosedChannelException(); - } protected abstract void doTakeReadOwner(); diff --git a/source/src/main/java/io/mycat/proxy/MycatReactorThread.java b/source/src/main/java/io/mycat/proxy/MycatReactorThread.java index 2c37215..5e2b04c 100644 --- a/source/src/main/java/io/mycat/proxy/MycatReactorThread.java +++ b/source/src/main/java/io/mycat/proxy/MycatReactorThread.java @@ -212,7 +212,7 @@ public void getMysqlSession(MySQLMetaBean mySQLMetaBean,AsynTaskCallBack