Skip to content

Commit

Permalink
Allow for multiple connections
Browse files Browse the repository at this point in the history
Breaking changes
Must initialize db (Quacker) then use NewConnection to retrieve a connection.
Previous Quacker methods (except close) moved to QuackCon
  • Loading branch information
loicalleyne committed Nov 29, 2024
1 parent b00b7b4 commit b4f5a5c
Showing 1 changed file with 41 additions and 16 deletions.
57 changes: 41 additions & 16 deletions couac.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,20 @@ const (
ObjectDepthColumns = ObjectDepthAll
)

// Quacker represents a DuckDB database.
type Quacker struct {
ctx context.Context
drv adbc.Driver
db adbc.Database
conn adbc.Connection
ctx context.Context
drv adbc.Driver
db adbc.Database
// duckdb database connections
ducklings []*QuackCon
}

// QuackCon represents a connection to a DuckDB database.
type QuackCon struct {
parent *Quacker
conn adbc.Connection
}
type Statement = adbc.Statement

// NewDuck opens a DuckDB database. The driverPath argument specifies the location of
Expand All @@ -52,23 +59,41 @@ func NewDuck(path string, driverPath *string) (*Quacker, error) {
if err != nil {
return nil, fmt.Errorf("new database error: %v", err)
}
couac.conn, err = couac.db.Open(context.Background())

return couac, nil
}

func (q *Quacker) NewConnection() (*QuackCon, error) {
var err error
qc := new(QuackCon)
qc.conn, err = q.db.Open(context.Background())
if err != nil {
return nil, fmt.Errorf("db open error: %v", err)
}
return couac, nil
qc.parent = q
q.ducklings = append(q.ducklings, qc)
return qc, nil
}

// Close closes the connection to the database and releases any associated resources.
// Close closes the database and releases any associated resources.
// It is important to do this to allow DuckDB to properly commit all WAL file changes before closing.
func (q *Quacker) Close() {
q.conn.Close()
for _, d := range q.ducklings {
d.conn.Close()
}
q.db.Close()
}

// Close closes the connection to database and releases any associated resources.
// It is important to do this to allow DuckDB to properly commit all WAL file changes before closing.
func (q *QuackCon) Close() {
q.parent = nil
q.conn.Close()
}

// Exec executes a statement that does not generate a result
// set. It returns the number of rows affected if known, otherwise -1.
func (q *Quacker) Exec(ctx context.Context, query string) (int64, error) {
func (q *QuackCon) Exec(ctx context.Context, query string) (int64, error) {
var u int64
stmt, err := q.conn.NewStatement()
if err != nil {
Expand Down Expand Up @@ -160,8 +185,8 @@ func (q *Quacker) Exec(ctx context.Context, query string) (int64, error) {
// fk_db_schema | utf8
// fk_table | utf8 not null
// fk_column_name | utf8 not null
func (q *Quacker) GetObjectsMap() ([]map[string]any, error) {
rr, err := q.conn.GetObjects(q.ctx, adbc.ObjectDepthAll, nil, nil, nil, nil, nil)
func (q *QuackCon) GetObjectsMap() ([]map[string]any, error) {
rr, err := q.conn.GetObjects(q.parent.ctx, adbc.ObjectDepthAll, nil, nil, nil, nil, nil)
if err != nil {
return nil, err
}
Expand All @@ -185,15 +210,15 @@ func (q *Quacker) GetObjectsMap() ([]map[string]any, error) {

// GetTableSchema returns the Arrow scheme of a DuckDB table. Pass nil for catalog and dbSchema
// to use the default catalog and database schema.
func (q *Quacker) GetTableSchema(ctx context.Context, catalog, dbSchema *string, tableName string) (*arrow.Schema, error) {
func (q *QuackCon) GetTableSchema(ctx context.Context, catalog, dbSchema *string, tableName string) (*arrow.Schema, error) {
return q.conn.GetTableSchema(ctx, catalog, dbSchema, tableName)
}

// IngestCreateAppend attempts to ingest an Arrow record into the DuckDB database, creating the table
// from the record's schema if it does not exist. It returns the number of rows affected if known, otherwise -1.
// Ingest mode switches between Create and Append since DuckDB does not currently support CreateAppend mode.
// DuckDB also does not support AutoCommit option.
func (q *Quacker) IngestCreateAppend(ctx context.Context, destTable string, rec arrow.Record) (int64, error) {
func (q *QuackCon) IngestCreateAppend(ctx context.Context, destTable string, rec arrow.Record) (int64, error) {
var u int64
if destTable == "" {
return u, fmt.Errorf("destination table name error")
Expand All @@ -202,7 +227,7 @@ func (q *Quacker) IngestCreateAppend(ctx context.Context, destTable string, rec
return u, fmt.Errorf("nil arrow record")
}

schema, _ := q.conn.GetTableSchema(q.ctx, nil, nil, destTable)
schema, _ := q.conn.GetTableSchema(q.parent.ctx, nil, nil, destTable)

stmt, err := q.conn.NewStatement()
if err != nil {
Expand Down Expand Up @@ -241,7 +266,7 @@ func (q *Quacker) IngestCreateAppend(ctx context.Context, destTable string, rec

// NewStatement initializes a new statement object tied to an open connection.
// The caller must close the statement when done with it.
func (q *Quacker) NewStatement() (Statement, error) {
func (q *QuackCon) NewStatement() (Statement, error) {
if q.conn == nil {
return nil, fmt.Errorf("database connection is closed")
}
Expand All @@ -253,7 +278,7 @@ func (q *Quacker) NewStatement() (Statement, error) {
// RecordReader.
// Since ADBC 1.1.0: releasing the returned RecordReader without consuming it fully is equivalent to
// calling AdbcStatementCancel.
func (q *Quacker) Query(ctx context.Context, query string) (array.RecordReader, adbc.Statement, int64, error) {
func (q *QuackCon) Query(ctx context.Context, query string) (array.RecordReader, adbc.Statement, int64, error) {
var u int64
stmt, err := q.conn.NewStatement()
if err != nil {
Expand Down

0 comments on commit b4f5a5c

Please sign in to comment.