diff --git a/go/data/query_log_parse.go b/go/data/query_log_parse.go index 496510d..cf5c87b 100644 --- a/go/data/query_log_parse.go +++ b/go/data/query_log_parse.go @@ -19,6 +19,8 @@ package data import ( "bufio" "errors" + "fmt" + "io" "os" "regexp" "sync" @@ -29,7 +31,7 @@ type ( logReaderState struct { fd *os.File - scanner *bufio.Scanner + reader *bufio.Reader reg *regexp.Regexp mu sync.Mutex lineNumber int @@ -65,9 +67,18 @@ func (s *mysqlLogReaderState) Next() (Query, bool) { return Query{}, false } - for s.scanner.Scan() { + for { + line, done, err := s.readLine() + if err != nil { + s.err = fmt.Errorf("error reading file: %w", err) + return Query{}, false + } + if done { + break + } + s.lineNumber++ - line := s.scanner.Text() + if len(line) == 0 { continue } @@ -82,20 +93,7 @@ func (s *mysqlLogReaderState) Next() (Query, bool) { // If we have a previous query, return it before processing the new line if s.prevQuery != "" { - query := Query{ - Query: s.prevQuery, - Line: s.queryStart, - Type: QueryT, - } - s.prevQuery = "" - - // If the new line is a query, store it for next iteration - if matches[3] == "Query" { - s.prevQuery = matches[4] - s.queryStart = s.lineNumber - } - - return query, true + return s.processQuery(matches), true } // Start a new query if this line is a query @@ -108,19 +106,62 @@ func (s *mysqlLogReaderState) Next() (Query, bool) { // Return the last query if we have one if s.prevQuery != "" { - query := Query{ - Query: s.prevQuery, - Line: s.queryStart, - Type: QueryT, - } - s.prevQuery = "" - return query, true + return s.finalizeQuery(), true } - s.err = s.scanner.Err() return Query{}, false } +func (s *logReaderState) readLine() (string, bool, error) { + line, isPrefix, err := s.reader.ReadLine() + if err == io.EOF { + return "", true, nil + } + if err != nil { + return "", false, err + } + + // Handle lines longer than the buffer size + totalLine := append([]byte{}, line...) + for isPrefix { + line, isPrefix, err = s.reader.ReadLine() + if err == io.EOF { + break + } + if err != nil { + return "", false, err + } + totalLine = append(totalLine, line...) + } + return string(totalLine), false, nil +} + +func (s *mysqlLogReaderState) finalizeQuery() Query { + query := Query{ + Query: s.prevQuery, + Line: s.queryStart, + Type: QueryT, + } + s.prevQuery = "" + return query +} + +func (s *mysqlLogReaderState) processQuery(matches []string) Query { + query := Query{ + Query: s.prevQuery, + Line: s.queryStart, + Type: QueryT, + } + s.prevQuery = "" + + // If the new line is a query, store it for next iteration + if matches[3] == "Query" { + s.prevQuery = matches[4] + s.queryStart = s.lineNumber + } + return query +} + func (s *logReaderState) Close() error { s.mu.Lock() defer s.mu.Unlock() @@ -136,15 +177,6 @@ func (s *logReaderState) Close() error { return s.err } -func (s *logReaderState) NextLine() (string, bool) { - more := s.scanner.Scan() - if !more { - return "", false - } - - return s.scanner.Text(), true -} - func (MySQLLogLoader) Load(fileName string) IteratorLoader { reg := regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}Z)\s+(\d+)\s+(\w+)\s+(.*)`) @@ -153,13 +185,11 @@ func (MySQLLogLoader) Load(fileName string) IteratorLoader { return &errLoader{err} } - scanner := bufio.NewScanner(fd) - return &mysqlLogReaderState{ logReaderState: logReaderState{ - scanner: scanner, - reg: reg, - fd: fd, + reader: bufio.NewReader(fd), + reg: reg, + fd: fd, }, } } diff --git a/go/data/slow_query_log_loader.go b/go/data/slow_query_log_loader.go index d268789..d302cda 100644 --- a/go/data/slow_query_log_loader.go +++ b/go/data/slow_query_log_loader.go @@ -52,9 +52,17 @@ func (s *slowQueryLogReaderState) Next() (Query, bool) { newStmt: true, } - for s.scanner.Scan() { + for { + line, done, err := s.readLine() + if err != nil { + s.err = fmt.Errorf("error reading file: %w", err) + return Query{}, false + } + if done { + break + } s.lineNumber++ - line := strings.TrimSpace(s.scanner.Text()) + line = strings.TrimSpace(line) result, done, err := s.processLine(line, state) if err != nil { @@ -66,11 +74,6 @@ func (s *slowQueryLogReaderState) Next() (Query, bool) { } } - if err := s.scanner.Err(); err != nil { - s.err = err - return Query{}, false - } - if !state.newStmt && state.currentQuery.Query != "" { s.err = errors.New("EOF: missing semicolon") } @@ -245,7 +248,7 @@ func readData(url string) ([]byte, error) { } func (SlowQueryLogLoader) Load(filename string) IteratorLoader { - var scanner *bufio.Scanner + var reader *bufio.Reader var fd *os.File if strings.HasPrefix(filename, "http") { @@ -253,20 +256,20 @@ func (SlowQueryLogLoader) Load(filename string) IteratorLoader { if err != nil { return &errLoader{err: err} } - scanner = bufio.NewScanner(bytes.NewReader(data)) + reader = bufio.NewReader(bytes.NewReader(data)) } else { var err error fd, err = os.OpenFile(filename, os.O_RDONLY, 0) if err != nil { return &errLoader{err: err} } - scanner = bufio.NewScanner(fd) + reader = bufio.NewReader(fd) } return &slowQueryLogReaderState{ logReaderState: logReaderState{ - scanner: scanner, - fd: fd, + fd: fd, + reader: reader, }, } } diff --git a/go/data/vtgate_log_parse.go b/go/data/vtgate_log_parse.go index 889ec50..76644a5 100644 --- a/go/data/vtgate_log_parse.go +++ b/go/data/vtgate_log_parse.go @@ -52,13 +52,11 @@ func (vll VtGateLogLoader) Load(fileName string) IteratorLoader { return &errLoader{err: err} } - scanner := bufio.NewScanner(fd) - return &vtgateLogReaderState{ logReaderState: logReaderState{ - scanner: scanner, - reg: reg, - fd: fd, + reader: bufio.NewReader(fd), + reg: reg, + fd: fd, }, NeedsBindVars: vll.NeedsBindVars, } @@ -76,9 +74,16 @@ func (s *vtgateLogReaderState) Next() (Query, bool) { return Query{}, false } - for s.scanner.Scan() { + for { + line, done, err := s.readLine() + if err != nil { + s.fail(fmt.Errorf("error reading file: %w", err)) + return Query{}, false + } + if done { + break + } s.lineNumber++ - line := s.scanner.Text() if len(line) == 0 { continue