Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Read replica support #476

Merged
merged 27 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2ce89df
added variables for tracking time since last write
a-alhusaini May 5, 2023
dcaa7d2
added variables to store reader and writer db connections
a-alhusaini May 7, 2023
b4010f4
switched from time.utc to time.monotonic for tracking time since last…
a-alhusaini May 7, 2023
fcec217
Added methods to switch between connections.
a-alhusaini May 7, 2023
d6658ea
added instance variant of switch_to_writer_adapter for use with callb…
a-alhusaini May 7, 2023
b22e62b
Fixed typo in def switch_to_writer_adapter
a-alhusaini May 7, 2023
9bc3743
added logic to automatically switch to reader adapter
a-alhusaini May 7, 2023
438f04c
functions in querying now dynamically change adapter based on need
a-alhusaini May 7, 2023
d2fc1d0
ensure all methods in query builder that need primary database switch…
a-alhusaini May 7, 2023
c2eb32d
added better error message for invalid connections
a-alhusaini May 10, 2023
2bd49f1
groundwork for replica testing
a-alhusaini May 10, 2023
21878b2
fixed error where mysql tests don't run
a-alhusaini May 10, 2023
866e00a
Granite::Base.adapter class method now actually fetches current adapt…
a-alhusaini May 10, 2023
720005d
fixed typo. Invalid adapter_type for pg_with_replica
a-alhusaini May 10, 2023
f6cf749
fixed True to true. Added table name to ReplicatedChat.
a-alhusaini May 10, 2023
4e97c87
update specs
kalinon May 11, 2023
4f1ef67
Merge pull request #1 from kalinon/track_last_query
a-alhusaini May 11, 2023
431ed8e
spec updates
kalinon May 11, 2023
e531675
Merge pull request #2 from kalinon/track_last_query
a-alhusaini May 11, 2023
4976d11
Update .gitignore
crimson-knight May 15, 2023
6ab17b0
moved connection management logic to seperate module. Fixed bug where…
a-alhusaini May 20, 2023
74ff74c
fixed error where reader connection switch ignored specified wait per…
a-alhusaini May 21, 2023
8ecc010
moved default value for connection switch wait period to granite::co…
a-alhusaini May 21, 2023
76133f3
moved connection macro to connection management module
a-alhusaini May 21, 2023
e11cf7c
cleaned up code for fetching first connection
a-alhusaini May 21, 2023
058b414
finalized syntax for adding new connections to granite::connections
a-alhusaini May 21, 2023
122be6c
optimization: when reader & writer database are the same do not dupli…
a-alhusaini May 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
PG_DATABASE_URL=postgres://granite:password@localhost:5432/granite_db
PG_REPLICA_URL=postgres://granite:password@localhost:5432/granite_db
robacarp marked this conversation as resolved.
Show resolved Hide resolved
MYSQL_DATABASE_URL=mysql://granite:password@localhost:3306/granite_db
MYSQL_REPLICA_URL=mysql://granite:password@localhost:3306/granite_db
SQLITE_DATABASE_URL=sqlite3:./granite.db
SQLITE_REPLICA_URL=sqlite3:./granite_replica.db
CURRENT_ADAPTER=pg
PG_VERSION=15.2
MYSQL_VERSION=5.7
Expand Down
14 changes: 6 additions & 8 deletions .github/workflows/spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ jobs:
with:
crystal: ${{ matrix.crystal }}
- name: Install shards
run: shards update --ignore-crystal-version
run: shards update --ignore-crystal-version --skip-postinstall --skip-executables
- name: Run tests
timeout-minutes: 2
run: crystal spec
env:
CURRENT_ADAPTER: sqlite
SQLITE_DATABASE_URL: sqlite3:./granite.db
MYSQL_DATABASE_URL: mysql://granite:password@localhost:3306/granite_db
PG_DATABASE_URL: postgres://granite:password@localhost:5432/granite_db
SQLITE_REPLICA_URL: sqlite3:./granite_replica.db
mysql-spec:
runs-on: ubuntu-latest
strategy:
Expand Down Expand Up @@ -71,15 +70,15 @@ jobs:
with:
crystal: ${{ matrix.crystal }}
- name: Install shards
run: shards update --ignore-crystal-version
run: shards update --ignore-crystal-version --skip-postinstall --skip-executables
- name: Run tests
timeout-minutes: 2
run: crystal spec
env:
CURRENT_ADAPTER: mysql
SQLITE_DATABASE_URL: sqlite3:./granite.db
MYSQL_DATABASE_URL: mysql://granite:password@localhost:3306/granite_db
PG_DATABASE_URL: postgres://granite:password@localhost:5432/granite_db
MYSQL_REPLICA_URL: mysql://granite:password@localhost:3306/granite_db
psql-spec:
runs-on: ubuntu-latest
strategy:
Expand Down Expand Up @@ -110,12 +109,11 @@ jobs:
with:
crystal: ${{ matrix.crystal }}
- name: Install shards
run: shards update --ignore-crystal-version
run: shards update --ignore-crystal-version --skip-postinstall --skip-executables
- name: Run tests
timeout-minutes: 2
run: crystal spec
env:
CURRENT_ADAPTER: pg
SQLITE_DATABASE_URL: sqlite3:./granite.db
MYSQL_DATABASE_URL: mysql://granite:password@localhost:3306/granite_db
PG_DATABASE_URL: postgres://granite:password@localhost:5432/granite_db
PG_REPLICA_URL: postgres://granite:password@localhost:5432/granite_db
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ shard.lock

# Ignore bin because they will be build with shards install
bin
.env
11 changes: 11 additions & 0 deletions export.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export PG_DATABASE_URL=postgres://granite:password@localhost:5432/granite_db
export PG_REPLICA_URL=postgres://granite:password@localhost:5432/granite__replica_db
export MYSQL_DATABASE_URL=mysql://granite:password@localhost:3306/granite_db
export MYSQL_REPLICA_URL=mysql://granite:password@localhost:3306/granite_replica_db
export SQLITE_DATABASE_URL=sqlite3:./granite.db
export SQLITE_REPLICA_URL=sqlite3:./granite_replica.db
export PG_VERSION=15.2
export MYSQL_VERSION=5.7
export SQLITE_VERSION=3110000
export SQLITE_VERSION_YEAR=2016
export CURRENT_ADAPTER=sqlite
40 changes: 20 additions & 20 deletions spec/adapter/adapters_spec.cr
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
require "../spec_helper"

class Foo < Granite::Base
connection sqlite

connection {{env("CURRENT_ADAPTER").id}}
column id : Int64, primary: true
end

Expand All @@ -13,41 +12,42 @@ end
describe Granite::Connections do
describe "registration" do
it "should allow connections to be be saved and looked up" do
Granite::Connections.registered_connections.size.should eq 3
Granite::Connections.registered_connections.size.should eq 2

if connection = Granite::Connections["mysql"]
connection.url.should eq ENV["MYSQL_DATABASE_URL"]
else
connection.should_not be_falsey
end
if connection = Granite::Connections["pg"]
connection.url.should eq ENV["PG_DATABASE_URL"]
if connection = Granite::Connections[CURRENT_ADAPTER]
connection[:writer].url.should eq ADAPTER_URL
else
connection.should_not be_falsey
end
if connection = Granite::Connections["sqlite"]
connection.url.should eq ENV["SQLITE_DATABASE_URL"]
else
connection.should_not be_falsey

case ENV["CURRENT_ADAPTER"]?
when "sqlite"
if connection = Granite::Connections["sqlite_with_replica"]
connection[:writer].url.should eq ENV["SQLITE_DATABASE_URL"]?
connection[:reader].url.should eq ADAPTER_REPLICA_URL
else
connection.should_not be_falsey
end
end
end

it "should disallow multiple connections with the same name" do
expect_raises(Exception, "Adapter with name 'mysql' has already been registered.") do
Granite::Connections << Granite::Adapter::Pg.new(name: "mysql", url: ENV["PG_DATABASE_URL"])
Granite::Connections << Granite::Adapter::Pg.new(name: "mysql2", url: "mysql://localhost:3306/test")
expect_raises(Exception, "Adapter with name 'mysql2' has already been registered.") do
Granite::Connections << Granite::Adapter::Pg.new(name: "mysql2", url: "mysql://localhost:3306/test")
end
end

it "should assign the correct connections to a model" do
adapter = Foo.adapter
adapter.name.should eq "sqlite"
adapter.url.should eq ENV["SQLITE_DATABASE_URL"]
adapter.name.should eq CURRENT_ADAPTER
adapter.url.should eq ADAPTER_URL
end

it "should use the first registered connection if none are specified" do
adapter = Bar.adapter
adapter.name.should eq "mysql"
adapter.url.should eq ENV["MYSQL_DATABASE_URL"]
adapter.name.should eq CURRENT_ADAPTER
adapter.url.should eq ADAPTER_URL
end
end
end
12 changes: 12 additions & 0 deletions spec/granite/connection_management_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
require "spec"

describe "Granite::Base track time since last write" do
it "should switch to reader db connection after connection_switch_wait_period after write operation" do
ReplicatedChat.connection_switch_wait_period = 250
ReplicatedChat.new(content: "hello world!").save!
sleep 500.milliseconds
current_url = ReplicatedChat.adapter.url
reader_url = Granite::Connections[ENV["CURRENT_ADAPTER"] + "_with_replica"].not_nil![:reader].url
current_url.should eq reader_url
end
end
2 changes: 1 addition & 1 deletion spec/run_test_dbs.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ docker run --name mysql -d \
-e MYSQL_USER=granite \
-e MYSQL_PASSWORD=password \
-p 3306:3306 \
mysql:%{MYSQL_VERSION}
mysql:${MYSQL_VERSION}

docker run --name psql -d \
-e POSTGRES_USER=granite \
Expand Down
24 changes: 20 additions & 4 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,25 @@ require "mysql"
require "pg"
require "sqlite3"

Granite::Connections << Granite::Adapter::Mysql.new(name: "mysql", url: ENV["MYSQL_DATABASE_URL"])
Granite::Connections << Granite::Adapter::Pg.new(name: "pg", url: ENV["PG_DATABASE_URL"])
Granite::Connections << Granite::Adapter::Sqlite.new(name: "sqlite", url: ENV["SQLITE_DATABASE_URL"])
CURRENT_ADAPTER = ENV["CURRENT_ADAPTER"]
ADAPTER_URL = ENV["#{CURRENT_ADAPTER.upcase}_DATABASE_URL"]
ADAPTER_REPLICA_URL = ENV["#{CURRENT_ADAPTER.upcase}_REPLICA_URL"]? || ADAPTER_URL

case CURRENT_ADAPTER
when "pg"
Granite::Connections << Granite::Adapter::Pg.new(name: CURRENT_ADAPTER, url: ADAPTER_URL)
Granite::Connections << {name: "pg_with_replica", writer: ADAPTER_URL, reader: ADAPTER_REPLICA_URL, adapter_type: Granite::Adapter::Pg}
when "mysql"
Granite::Connections << Granite::Adapter::Mysql.new(name: CURRENT_ADAPTER, url: ADAPTER_URL)
Granite::Connections << {name: "mysql_with_replica", writer: ADAPTER_URL, reader: ADAPTER_REPLICA_URL, adapter_type: Granite::Adapter::Mysql}
when "sqlite"
Granite::Connections << Granite::Adapter::Sqlite.new(name: CURRENT_ADAPTER, url: ADAPTER_URL)
Granite::Connections << {name: "sqlite_with_replica", writer: ADAPTER_URL, reader: ADAPTER_REPLICA_URL, adapter_type: Granite::Adapter::Sqlite}
when Nil
raise "Please set CURRENT_ADAPTER"
else
raise "Unknown adapter #{CURRENT_ADAPTER}"
end

require "spec"
require "../src/granite"
Expand Down Expand Up @@ -32,6 +48,6 @@ end
{% if env("CURRENT_ADAPTER") == "mysql" && !flag?(:issue_473) %}
Spec.after_each do
# https://github.com/amberframework/granite/issues/473
Granite::Connections["mysql"].try &.database.pool.close
Granite::Connections["mysql"].not_nil![:writer].try &.database.pool.close
end
{% end %}
8 changes: 8 additions & 0 deletions spec/spec_models.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ end
{% begin %}
{% adapter_literal = env("CURRENT_ADAPTER").id %}

class ReplicatedChat < Granite::Base
connection {{ "#{adapter_literal}_with_replica" }}
table replicated_chats

column id : Int64, primary: true
column content : String
end

class Chat < Granite::Base
connection {{ adapter_literal }}
table chats
Expand Down
10 changes: 10 additions & 0 deletions src/granite/base.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require "./connections"
require "./integrators"
require "./converters"
require "./type"
require "./connection_management"

# Granite::Base is the base class for your model objects.
abstract class Granite::Base
Expand All @@ -29,6 +30,8 @@ abstract class Granite::Base
include Migrator
include Select

include ConnectionManagement

extend Columns::ClassMethods
extend Tables::ClassMethods
extend Granite::Migrator::ClassMethods
Expand Down Expand Up @@ -70,5 +73,12 @@ abstract class Granite::Base

disable_granite_docs? def initialize
end

crimson-knight marked this conversation as resolved.
Show resolved Hide resolved
before_save :switch_to_writer_adapter
before_destroy :switch_to_writer_adapter
after_save :update_last_write_time
after_save :schedule_adapter_switch
after_destroy :update_last_write_time
after_destroy :schedule_adapter_switch
end
end
89 changes: 89 additions & 0 deletions src/granite/connection_management.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
module Granite::ConnectionManagement
macro included
# Default value for the time a model waits before using a reader
# database connection for read operations
# all models use this value. Change it
# to change it in all Granite::Base models.
class_property connection_switch_wait_period : Int64 = Granite::Connections.connection_switch_wait_period
@@last_write_time = Time.monotonic

class_property current_adapter : Granite::Adapter::Base?
class_property reader_adapter : Granite::Adapter::Base = Granite::Connections.first_reader
class_property writer_adapter : Granite::Adapter::Base = Granite::Connections.first_writer

def self.last_write_time
@@last_write_time
end

# This is done this way because callbacks don't work on class mthods
def self.update_last_write_time
@@last_write_time = Time.monotonic
end

def update_last_write_time
self.class.update_last_write_time
end

def self.time_since_last_write
Time.monotonic - @@last_write_time
end

def time_since_last_write
self.class.time_since_last_write
end

def self.switch_to_reader_adapter
if time_since_last_write > @@connection_switch_wait_period.milliseconds
@@current_adapter = @@reader_adapter
end
end

def switch_to_reader_adapter
self.class.switch_to_reader_adapter
end

def self.switch_to_writer_adapter
@@current_adapter = @@writer_adapter
end

def switch_to_writer_adapter
self.class.switch_to_writer_adapter
end

def self.schedule_adapter_switch
spawn do
sleep @@connection_switch_wait_period.milliseconds
switch_to_reader_adapter
end

Fiber.yield
end

def schedule_adapter_switch
self.class.schedule_adapter_switch
end

def self.adapter
begin
@@current_adapter.not_nil!
rescue NilAssertionError
Granite::Connections.registered_connections.first?.not_nil![:writer]
end
end
end

macro connection(name)
{% name = name.id.stringify %}

error_message = "Connection #{{{name}}} not found in Granite::Connections.
Available connections are:

#{Granite::Connections.registered_connections.map{ |conn| "#{conn[:writer].name}"}.join(", ")}"

raise error_message if Granite::Connections[{{name}}].nil?

self.writer_adapter = Granite::Connections[{{name}}].not_nil![:writer]
self.reader_adapter = Granite::Connections[{{name}}].not_nil![:reader]
self.current_adapter = @@writer_adapter
end
end
34 changes: 29 additions & 5 deletions src/granite/connections.cr
Original file line number Diff line number Diff line change
@@ -1,16 +1,40 @@
module Granite
class Connections
class_getter registered_connections = [] of Granite::Adapter::Base
class_property connection_switch_wait_period : Int64 = 2000
class_getter registered_connections = [] of {writer: Granite::Adapter::Base, reader: Granite::Adapter::Base}
crimson-knight marked this conversation as resolved.
Show resolved Hide resolved

# Registers the given *adapter*. Raises if an adapter with the same name has already been registered.
def self.<<(adapter : Granite::Adapter::Base) : Nil
raise "Adapter with name '#{adapter.name}' has already been registered." if @@registered_connections.any? { |conn| conn.name == adapter.name }
@@registered_connections << adapter
raise "Adapter with name '#{adapter.name}' has already been registered." if @@registered_connections.any? { |conn| conn[:writer].name == adapter.name }
@@registered_connections << {writer: adapter, reader: adapter}
end

def self.<<(data : NamedTuple(name: String, reader: String, writer: String, adapter_type: Granite::Adapter::Base.class)) : Nil
raise "Adapter with name '#{data[:name]}' has already been registered." if @@registered_connections.any? { |conn| conn[:writer].name == data[:name] }

writer_adapter = data[:adapter_type].new(name: data[:name], url: data[:writer])

# if reader/writer reference the same db. Make them point to the same granite adapter.
# This avoids connection pool duplications on the same database.
if (data[:reader] == data[:writer])
return @@registered_connections << {writer: writer_adapter, reader: writer_adapter}
end

reader_adapter = data[:adapter_type].new(name: data[:name], url: data[:reader])
@@registered_connections << {writer: writer_adapter, reader: reader_adapter}
end

# Returns a registered connection with the given *name*, otherwise `nil`.
def self.[](name : String) : Granite::Adapter::Base?
registered_connections.find { |conn| conn.name == name }
def self.[](name : String) : {writer: Granite::Adapter::Base, reader: Granite::Adapter::Base}?
registered_connections.find { |conn| conn[:writer].name == name }
end

def self.first_writer
@@registered_connections.first?.not_nil![:writer]
end

def self.first_reader
@@registered_connections.first?.not_nil![:reader]
end
end
end
1 change: 1 addition & 0 deletions src/granite/query/builder.cr
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class Granite::Query::Builder(Model)
end

def delete
Model.switch_to_writer_adapter
assembler.delete
end

Expand Down
Loading