Skip to content
This repository has been archived by the owner on Sep 16, 2020. It is now read-only.

Create %Toniq.Job{} struct instead using a map #49

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions lib/toniq.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ defmodule Toniq do
Enqueue job to be run in the background as soon as possible
"""
def enqueue(worker_module, arguments \\ []) do
Toniq.JobPersistence.store_job(worker_module, arguments)
worker_module
|> Toniq.Job.new(arguments)
|> Toniq.JobPersistence.store_job()
|> Toniq.JobRunner.register_job()
end

Expand All @@ -32,7 +34,8 @@ defmodule Toniq do
"""
def enqueue_with_delay(worker_module, arguments, options) do
worker_module
|> Toniq.JobPersistence.store_delayed_job(arguments, options)
|> Toniq.Job.new(arguments, options)
|> Toniq.JobPersistence.store_delayed_job()
|> Toniq.DelayedJobTracker.register_job()
end

Expand Down
7 changes: 6 additions & 1 deletion lib/toniq/delayed_job_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ defmodule Toniq.DelayedJobTracker do
{:noreply, remaining_jobs}
end

defp has_expired?(%{options: nil}), do: true
defp has_expired?(job) do
job.delayed_until != :infinity and job.delayed_until <= :os.system_time(:milli_seconds)
delayed_until = Keyword.get(job.options, :delayed_until)

delayed_until != nil
and delayed_until != :infinity
and delayed_until <= :os.system_time(:milli_seconds)
end
end
37 changes: 31 additions & 6 deletions lib/toniq/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,35 @@ defmodule Toniq.Job do
# NOTE: If the format changes: add migration code for older formats
@job_format_version 1

def build(id, worker_module, arguments, options \\ []) do
%{id: id, worker: worker_module, arguments: arguments, version: @job_format_version}
|> add_delay(options)
alias Toniq.Job

defstruct [
:id,
:worker,
:arguments,
:version,
:options,
:vm,
:error
]

def new(worker_module, arguments, options \\ nil) do
unless Code.ensure_loaded?(worker_module), do: raise "Worker not exists"

%Job{
worker: worker_module,
arguments: arguments,
version: @job_format_version,
options: add_delay(options)
}
end

def set_id(job, id), do: %{job | id: id}

def add_vm_identifier(job, identifier), do: %{job | vm: identifier}

def set_error(job, error), do: %{job | error: error}

def migrate(job), do: migrate_v0_jobs_to_v1(job)

# Convert from the pre-1.0 format. Replace this with the migration from 1 to 2 when you add job_format_version 2. We keep it to be able to test format migration.
Expand Down Expand Up @@ -35,12 +59,13 @@ defmodule Toniq.Job do
end
end

defp add_delay(job, options) do
defp add_delay(nil), do: nil
defp add_delay(options) do
options
|> Keyword.get(:delay_for)
|> case do
nil -> job
delay -> job |> Map.put(:delayed_until, delay |> to_expiry)
nil -> options
delay -> Keyword.put(options, :delayed_until, delay |> to_expiry)
end
end

Expand Down
68 changes: 45 additions & 23 deletions lib/toniq/job_persistence.ex
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
defmodule Toniq.JobPersistence do
import Exredis.Api
alias Toniq.Job

@doc """
Stores a job in redis. If it does not succeed it will fail right away.
"""
def store_job(worker_module, arguments, identifier \\ default_identifier()) do
store_job_in_key(worker_module, arguments, jobs_key(identifier), identifier)
def store_job(job, identifier \\ default_identifier()) do
store_job_in_key(job, jobs_key(identifier), identifier)
end

# Only used in tests
def store_incoming_job(worker_module, arguments, identifier \\ default_identifier()) do
store_job_in_key(worker_module, arguments, incoming_jobs_key(identifier), identifier)
def store_incoming_job(job, identifier \\ default_identifier()) do
store_job_in_key(job, incoming_jobs_key(identifier), identifier)
end

@doc """
Stores a delayed job in redis.
"""
def store_delayed_job(worker_module, arguments, options, identifier \\ default_identifier()) do
store_job_in_key(worker_module, arguments, delayed_jobs_key(identifier), identifier, options)
def store_delayed_job(job, identifier \\ default_identifier()) do
store_job_in_key(job, delayed_jobs_key(identifier), identifier)
end

# Only used internally by JobImporter
def remove_from_incoming_jobs(job) do
redis() |> srem(incoming_jobs_key(default_identifier()), strip_vm_identifier(job))
redis() |> srem(incoming_jobs_key(default_identifier()), prepare_for_redis(job))
end

@doc """
Expand Down Expand Up @@ -53,20 +54,20 @@ defmodule Toniq.JobPersistence do
"""
def mark_as_successful(job, identifier \\ default_identifier()) do
redis()
|> srem(jobs_key(identifier), strip_vm_identifier(job))
|> srem(jobs_key(identifier), prepare_for_redis(job))
end

@doc """
Marks a job as failed. This removes the job from the regular list and stores it in the failed jobs list.
"""
def mark_as_failed(job, error, identifier \\ default_identifier()) do
job_with_error = Map.put(job, :error, error)
job_with_error = Job.set_error(job, error)

redis()
|> Exredis.query_pipe([
["MULTI"],
["SREM", jobs_key(identifier), strip_vm_identifier(job)],
["SADD", failed_jobs_key(identifier), strip_vm_identifier(job_with_error)],
["SREM", jobs_key(identifier), prepare_for_redis(job)],
["SADD", failed_jobs_key(identifier), prepare_for_redis(job_with_error)],
["EXEC"]
])

Expand All @@ -79,13 +80,13 @@ defmodule Toniq.JobPersistence do
Uses "job.vm" to do the operation in the correct namespace.
"""
def move_failed_job_to_incomming_jobs(job_with_error) do
job = Map.delete(job_with_error, :error)
job = Job.set_error(job_with_error, nil)

redis()
|> Exredis.query_pipe([
["MULTI"],
["SREM", failed_jobs_key(job.vm), strip_vm_identifier(job_with_error)],
["SADD", incoming_jobs_key(job.vm), strip_vm_identifier(job)],
["SREM", failed_jobs_key(job.vm), prepare_for_redis(job_with_error)],
["SADD", incoming_jobs_key(job.vm), prepare_for_redis(job)],
["EXEC"]
])

Expand All @@ -101,8 +102,8 @@ defmodule Toniq.JobPersistence do
redis()
|> Exredis.query_pipe([
["MULTI"],
["SREM", delayed_jobs_key(delayed_job.vm), strip_vm_identifier(delayed_job)],
["SADD", incoming_jobs_key(delayed_job.vm), strip_vm_identifier(delayed_job)],
["SREM", delayed_jobs_key(delayed_job.vm), prepare_for_redis(delayed_job)],
["SADD", incoming_jobs_key(delayed_job.vm), prepare_for_redis(delayed_job)],
["EXEC"]
])

Expand All @@ -116,7 +117,7 @@ defmodule Toniq.JobPersistence do
"""
def delete_failed_job(job) do
redis()
|> srem(failed_jobs_key(job.vm), strip_vm_identifier(job))
|> srem(failed_jobs_key(job.vm), prepare_for_redis(job))
end

def jobs_key(identifier) do
Expand All @@ -135,13 +136,15 @@ defmodule Toniq.JobPersistence do
identifier_scoped_key(:incoming_jobs, identifier)
end

defp store_job_in_key(worker_module, arguments, key, identifier, options \\ []) do
defp store_job_in_key(job, key, identifier) do
job_id = redis() |> incr(counter_key())

job =
Toniq.Job.build(job_id, worker_module, arguments, options) |> add_vm_identifier(identifier)
job
|> Job.set_id(job_id)
|> Job.add_vm_identifier(identifier)

redis() |> sadd(key, strip_vm_identifier(job))
redis() |> sadd(key, prepare_for_redis(job))
job
end

Expand All @@ -151,15 +154,34 @@ defmodule Toniq.JobPersistence do
|> Enum.map(&build_job/1)
|> Enum.sort(&first_in_first_out/2)
|> Enum.map(fn job -> convert_to_latest_job_format(job, redis_key) end)
|> Enum.map(fn job -> add_vm_identifier(job, identifier) end)
|> Enum.map(fn job ->
error = Map.has_key?(job, :error) && job.error || nil
options = Map.has_key?(job, :options) && job.options || nil

%Job{
id: job.id,
worker: job.worker,
arguments: job.arguments,
version: 1,
options: options,
error: error,
vm: identifier
}
end)
end

defp build_job(data) do
:erlang.binary_to_term(data)
end

def add_vm_identifier(job, identifier), do: job |> Map.put(:vm, identifier)
def strip_vm_identifier(job), do: job |> Map.delete(:vm)
defp prepare_for_redis(job) do
job
|> Map.from_struct()
|> Map.to_list()
|> Enum.filter(fn {_,v} -> v != nil end)
|> Enum.into(%{})
|> Map.delete(:vm)
end

defp convert_to_latest_job_format(loaded_job, redis_key) do
case Toniq.Job.migrate(loaded_job) do
Expand Down
29 changes: 19 additions & 10 deletions test/toniq/delayed_job_tracker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Toniq.DelayedJobTrackerTest do
use ExUnit.Case
use Retry

alias Toniq.{DelayedJobTracker, JobPersistence}
alias Toniq.{DelayedJobTracker, JobPersistence, Job}

defmodule TestWorker do
use Toniq.Worker
Expand All @@ -18,10 +18,12 @@ defmodule Toniq.DelayedJobTrackerTest do

test "imports delayed jobs on start" do
TestWorker
|> JobPersistence.store_delayed_job(%{some: "data"}, delay_for: 250)
|> Job.new(%{some: "data"}, delay_for: 250)
|> JobPersistence.store_delayed_job()

TestWorker
|> JobPersistence.store_delayed_job(%{some: "data"}, delay_for: 500)
|> Job.new([some: "data"], [delay_for: 500])
|> JobPersistence.store_delayed_job()

assert JobPersistence.delayed_jobs() |> Enum.count() == 2

Expand All @@ -34,7 +36,8 @@ defmodule Toniq.DelayedJobTrackerTest do

test "imports delayed jobs on takeover (it calls reload_job_list)" do
TestWorker
|> JobPersistence.store_delayed_job(%{some: "data"}, delay_for: 250)
|> Job.new([some: "data"], [delay_for: 250])
|> JobPersistence.store_delayed_job()

assert JobPersistence.delayed_jobs() |> Enum.count() == 1

Expand All @@ -49,11 +52,13 @@ defmodule Toniq.DelayedJobTrackerTest do
DelayedJobTracker.start_link(:test_delayed_job_tracker)

TestWorker
|> JobPersistence.store_delayed_job(%{some: "data"}, delay_for: 250)
|> Job.new([some: "data"], [delay_for: 250])
|> JobPersistence.store_delayed_job()
|> DelayedJobTracker.register_job()

TestWorker
|> JobPersistence.store_delayed_job(%{some: "data"}, delay_for: 500)
|> Job.new([some: "data"], [delay_for: 500])
|> JobPersistence.store_delayed_job()
|> DelayedJobTracker.register_job()

assert JobPersistence.delayed_jobs() |> Enum.count() == 2
Expand All @@ -67,11 +72,13 @@ defmodule Toniq.DelayedJobTrackerTest do
DelayedJobTracker.start_link(:test_delayed_job_tracker)

TestWorker
|> JobPersistence.store_delayed_job(%{some: "data"}, delay_for: :infinity)
|> Job.new([some: "data"], [delay_for: :infinity])
|> JobPersistence.store_delayed_job()
|> DelayedJobTracker.register_job()

TestWorker
|> JobPersistence.store_delayed_job(%{some: "data"}, delay_for: :infinity)
|> Job.new([some: "data"], [delay_for: :infinity])
|> JobPersistence.store_delayed_job()
|> DelayedJobTracker.register_job()

assert JobPersistence.delayed_jobs() |> Enum.count() == 2
Expand All @@ -85,11 +92,13 @@ defmodule Toniq.DelayedJobTrackerTest do
DelayedJobTracker.start_link(:test_delayed_job_tracker)

TestWorker
|> JobPersistence.store_delayed_job(%{some: "data"}, delay_for: :infinity)
|> Job.new([some: "data"], [delay_for: :infinity])
|> JobPersistence.store_delayed_job()
|> DelayedJobTracker.register_job()

TestWorker
|> JobPersistence.store_delayed_job(%{some: "data"}, delay_for: 10_000)
|> Job.new(%{some: "data"}, delay_for: 10_000)
|> JobPersistence.store_delayed_job()
|> DelayedJobTracker.register_job()

assert JobPersistence.delayed_jobs() |> Enum.count() == 2
Expand Down
5 changes: 4 additions & 1 deletion test/toniq/failover_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Ensure we can failover jobs from one VM to another when it exits or crashes
defmodule Toniq.FailoverTest do
use ExUnit.Case
alias Toniq.Job

setup do
Process.whereis(:toniq_redis) |> Exredis.query(["FLUSHDB"])
Expand Down Expand Up @@ -40,7 +41,9 @@ defmodule Toniq.FailoverTest do
end

defp add_job(identifier) do
Toniq.JobPersistence.store_job(FakeWorker, [], identifier)
FakeWorker
|> Job.new([])
|> Toniq.JobPersistence.store_job(identifier)
end

defp start_keepalive(name) do
Expand Down
5 changes: 4 additions & 1 deletion test/toniq/job_importer_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Exredis.JobImporterTest do
use ExUnit.Case
alias Toniq.Job

defmodule TestWorker do
use Toniq.Worker
Expand All @@ -17,7 +18,9 @@ defmodule Exredis.JobImporterTest do
@tag :capture_log
test "imports jobs from the incoming_jobs queue" do
Process.register(self(), :toniq_job_importer_test)
Toniq.JobPersistence.store_incoming_job(TestWorker, data: 10)
TestWorker
|> Job.new([])
|> Toniq.JobPersistence.store_incoming_job()

assert_receive :job_has_been_run, 1000
# wait for job to be removed
Expand Down
Loading