Skip to content

Commit

Permalink
refactor: move to budio.Reader instead of Scanner
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Nov 15, 2024
1 parent 50f2cb6 commit e53f3e1
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 58 deletions.
108 changes: 69 additions & 39 deletions go/data/query_log_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package data
import (
"bufio"
"errors"
"fmt"
"io"
"os"
"regexp"
"sync"
Expand All @@ -29,7 +31,7 @@ type (

logReaderState struct {
fd *os.File
scanner *bufio.Scanner
reader *bufio.Reader
reg *regexp.Regexp
mu sync.Mutex
lineNumber int
Expand Down Expand Up @@ -65,9 +67,18 @@ func (s *mysqlLogReaderState) Next() (Query, bool) {
return Query{}, false
}

for s.scanner.Scan() {
for {
line, done, err := s.readLine()
if err != nil {
s.err = fmt.Errorf("error reading file: %w", err)
return Query{}, false
}
if done {
break
}

s.lineNumber++
line := s.scanner.Text()

if len(line) == 0 {
continue
}
Expand All @@ -82,20 +93,7 @@ func (s *mysqlLogReaderState) Next() (Query, bool) {

// If we have a previous query, return it before processing the new line
if s.prevQuery != "" {
query := Query{
Query: s.prevQuery,
Line: s.queryStart,
Type: QueryT,
}
s.prevQuery = ""

// If the new line is a query, store it for next iteration
if matches[3] == "Query" {
s.prevQuery = matches[4]
s.queryStart = s.lineNumber
}

return query, true
return s.processQuery(matches), true
}

// Start a new query if this line is a query
Expand All @@ -108,19 +106,62 @@ func (s *mysqlLogReaderState) Next() (Query, bool) {

// Return the last query if we have one
if s.prevQuery != "" {
query := Query{
Query: s.prevQuery,
Line: s.queryStart,
Type: QueryT,
}
s.prevQuery = ""
return query, true
return s.finalizeQuery(), true
}

s.err = s.scanner.Err()
return Query{}, false
}

func (s *logReaderState) readLine() (string, bool, error) {
line, isPrefix, err := s.reader.ReadLine()
if err == io.EOF {
return "", true, nil
}
if err != nil {
return "", false, err
}

// Handle lines longer than the buffer size
totalLine := append([]byte{}, line...)
for isPrefix {
line, isPrefix, err = s.reader.ReadLine()
if err == io.EOF {
break
}
if err != nil {
return "", false, err
}
totalLine = append(totalLine, line...)
}
return string(totalLine), false, nil
}

func (s *mysqlLogReaderState) finalizeQuery() Query {
query := Query{
Query: s.prevQuery,
Line: s.queryStart,
Type: QueryT,
}
s.prevQuery = ""
return query
}

func (s *mysqlLogReaderState) processQuery(matches []string) Query {
query := Query{
Query: s.prevQuery,
Line: s.queryStart,
Type: QueryT,
}
s.prevQuery = ""

// If the new line is a query, store it for next iteration
if matches[3] == "Query" {
s.prevQuery = matches[4]
s.queryStart = s.lineNumber
}
return query
}

func (s *logReaderState) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -136,15 +177,6 @@ func (s *logReaderState) Close() error {
return s.err
}

func (s *logReaderState) NextLine() (string, bool) {
more := s.scanner.Scan()
if !more {
return "", false
}

return s.scanner.Text(), true
}

func (MySQLLogLoader) Load(fileName string) IteratorLoader {
reg := regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}Z)\s+(\d+)\s+(\w+)\s+(.*)`)

Expand All @@ -153,13 +185,11 @@ func (MySQLLogLoader) Load(fileName string) IteratorLoader {
return &errLoader{err}
}

scanner := bufio.NewScanner(fd)

return &mysqlLogReaderState{
logReaderState: logReaderState{
scanner: scanner,
reg: reg,
fd: fd,
reader: bufio.NewReader(fd),
reg: reg,
fd: fd,
},
}
}
27 changes: 15 additions & 12 deletions go/data/slow_query_log_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,17 @@ func (s *slowQueryLogReaderState) Next() (Query, bool) {
newStmt: true,
}

for s.scanner.Scan() {
for {
line, done, err := s.readLine()
if err != nil {
s.err = fmt.Errorf("error reading file: %w", err)
return Query{}, false
}
if done {
break
}
s.lineNumber++
line := strings.TrimSpace(s.scanner.Text())
line = strings.TrimSpace(line)

result, done, err := s.processLine(line, state)
if err != nil {
Expand All @@ -66,11 +74,6 @@ func (s *slowQueryLogReaderState) Next() (Query, bool) {
}
}

if err := s.scanner.Err(); err != nil {
s.err = err
return Query{}, false
}

if !state.newStmt && state.currentQuery.Query != "" {
s.err = errors.New("EOF: missing semicolon")
}
Expand Down Expand Up @@ -245,28 +248,28 @@ func readData(url string) ([]byte, error) {
}

func (SlowQueryLogLoader) Load(filename string) IteratorLoader {
var scanner *bufio.Scanner
var reader *bufio.Reader
var fd *os.File

if strings.HasPrefix(filename, "http") {
data, err := readData(filename)
if err != nil {
return &errLoader{err: err}
}
scanner = bufio.NewScanner(bytes.NewReader(data))
reader = bufio.NewReader(bytes.NewReader(data))
} else {
var err error
fd, err = os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil {
return &errLoader{err: err}
}
scanner = bufio.NewScanner(fd)
reader = bufio.NewReader(fd)
}

return &slowQueryLogReaderState{
logReaderState: logReaderState{
scanner: scanner,
fd: fd,
fd: fd,
reader: reader,
},
}
}
Expand Down
19 changes: 12 additions & 7 deletions go/data/vtgate_log_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,11 @@ func (vll VtGateLogLoader) Load(fileName string) IteratorLoader {
return &errLoader{err: err}
}

scanner := bufio.NewScanner(fd)

return &vtgateLogReaderState{
logReaderState: logReaderState{
scanner: scanner,
reg: reg,
fd: fd,
reader: bufio.NewReader(fd),
reg: reg,
fd: fd,
},
NeedsBindVars: vll.NeedsBindVars,
}
Expand All @@ -76,9 +74,16 @@ func (s *vtgateLogReaderState) Next() (Query, bool) {
return Query{}, false
}

for s.scanner.Scan() {
for {
line, done, err := s.readLine()
if err != nil {
s.fail(fmt.Errorf("error reading file: %w", err))
return Query{}, false
}
if done {
break
}
s.lineNumber++
line := s.scanner.Text()

if len(line) == 0 {
continue
Expand Down

0 comments on commit e53f3e1

Please sign in to comment.