Skip to content

Commit

Permalink
Use one fsevent_watch process per listener instead of one per dir
Browse files Browse the repository at this point in the history
  • Loading branch information
tgxworld committed Dec 5, 2019
1 parent e844336 commit 5764592
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 166 deletions.
3 changes: 3 additions & 0 deletions lib/listen/adapter/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def _timed(title)
raise
end

def _configure(dir, &callback)
end

# TODO: allow backend adapters to pass specific invalidation objects
# e.g. Darwin -> DirRescan, INotify -> MoveScan, etc.
def _queue_change(type, dir, rel_path, options)
Expand Down
60 changes: 23 additions & 37 deletions lib/listen/adapter/darwin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,59 +35,45 @@ def self.usable?

private

# NOTE: each directory gets a DIFFERENT callback!
def _configure(dir, &callback)
def _run
require 'rb-fsevent'
worker = FSEvent.new
opts = { latency: options.latency }
dirs_to_watch = config.directories.map(&:to_s)
_log(:info) { "fsevent: watching: #{dirs_to_watch.inspect}" }

worker.watch(dirs_to_watch, opts) do |dirs|
dirs.each do |dir|
dir = Pathname.new(dir.sub(%r{\/$}, ''))

@workers ||= ::Queue.new
@workers << FSEvent.new.tap do |worker|
_log :debug, "fsevent: watching: #{dir.to_s.inspect}"
worker.watch(dir.to_s, opts, &callback)
@callbacks.each do |watched_dir, callback|
if watched_dir.eql?(dir) || Listen::Directory.ascendant_of?(watched_dir, dir)
callback.call(dir)
end
end
end
end
end

def _run
first = @workers.pop
Listen::Internals::ThreadPool.add { _run_worker(worker) }
end

# NOTE: _run is called within a thread, so run every other
# worker in it's own thread
_run_workers_in_background(_to_array(@workers))
_run_worker(first)
def _process_changes(watched_dirs, dirs)
end

def _process_event(dir, event)
_log :debug, "fsevent: processing event: #{event.inspect}"
event.each do |path|
new_path = Pathname.new(path.sub(%r{\/$}, ''))
_log :debug, "fsevent: #{new_path}"
# TODO: does this preserve symlinks?
rel_path = new_path.relative_path_from(dir).to_s
_queue_change(:dir, dir, rel_path, recursive: true)
end
def _process_event(dir, path)
_log(:debug) { "fsevent: processing path: #{path.inspect}" }
# TODO: does this preserve symlinks?
rel_path = path.relative_path_from(dir).to_s
_queue_change(:dir, dir, rel_path, recursive: true)
end

def _run_worker(worker)
_log :debug, "fsevent: running worker: #{worker.inspect}"
_log(:debug) { "fsevent: running worker: #{worker.inspect}" }
worker.run
rescue
format_string = 'fsevent: running worker failed: %s:%s called from: %s'
_log_exception format_string, caller
end

def _run_workers_in_background(workers)
workers.each do |worker|
# NOTE: while passing local variables to the block below is not
# thread safe, using 'worker' from the enumerator above is ok
Listen::Internals::ThreadPool.add { _run_worker(worker) }
end
end

def _to_array(queue)
workers = []
workers << queue.pop until queue.empty?
workers
end
end
end
end
6 changes: 6 additions & 0 deletions lib/listen/directory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ def self.scan(snapshot, rel_path, options)
raise
end

def self.ascendant_of?(base, other)
other.ascend do |ascendant|
break true if base == ascendant
end
end

def self._async_changes(snapshot, path, previous, options)
fail "Not a Pathname: #{path.inspect}" unless path.respond_to?(:children)
previous.each do |entry, data|
Expand Down
129 changes: 2 additions & 127 deletions spec/lib/listen/adapter/darwin_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# This is just so stubs work
require 'rb-fsevent'

require 'tmpdir'
require 'fileutils'
require 'listen/adapter/darwin'

include Listen
Expand Down Expand Up @@ -69,130 +70,4 @@
it { should eq 1234 }
end
end

describe 'multiple dirs' do
let(:dir1) { fake_path('/foo/dir1', cleanpath: fake_path('/foo/dir1')) }
let(:dir2) { fake_path('/foo/dir2', cleanpath: fake_path('/foo/dir1')) }
let(:dir3) { fake_path('/foo/dir3', cleanpath: fake_path('/foo/dir1')) }

before do
allow(config).to receive(:queue).and_return(queue)
allow(config).to receive(:silencer).and_return(silencer)
end

let(:foo1) { double('fsevent1') }
let(:foo2) { double('fsevent2') }
let(:foo3) { double('fsevent3') }

before do
allow(FSEvent).to receive(:new).and_return(*expectations.values, nil)
expectations.each do |dir, obj|
allow(obj).to receive(:watch).with(dir.to_s, latency: 0.1)
end
end

describe 'configuration' do
before do
subject.configure
end

context 'with 1 directory' do
let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } }

let(:expectations) { { '/foo/dir1': foo1 } }

it 'configures directory' do
expect(foo1).to have_received(:watch).with('/foo/dir1', latency: 0.1)
end
end

context 'with 2 directories' do
let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } }
let(:expectations) { { dir1: foo1, dir2: foo2 } }

it 'configures directories' do
expect(foo1).to have_received(:watch).with('dir1', latency: 0.1)
expect(foo2).to have_received(:watch).with('dir2', latency: 0.1)
end
end

context 'with 3 directories' do
let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } }
let(:expectations) do
{
'/foo/dir1': foo1,
'/foo/dir2': foo2,
'/foo/dir3': foo3
}
end

it 'configures directories' do
expect(foo1).to have_received(:watch).with('/foo/dir1', latency: 0.1)
expect(foo2).to have_received(:watch).with('/foo/dir2', latency: 0.1)
expect(foo3).to have_received(:watch).with('/foo/dir3', latency: 0.1)
end
end
end

describe 'running threads' do
let(:running) { [] }
let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } }

before do
started = ::Queue.new
threads = ::Queue.new
left = ::Queue.new

# NOTE: Travis has a hard time creating threads on OSX
thread_start_overhead = 3
max_test_time = 3 * thread_start_overhead
block_time = max_test_time + thread_start_overhead

expectations.each do |name, _|
left << name
end

expectations.each do |_, obj|
allow(obj).to receive(:run) do
current_name = left.pop
threads << Thread.current
started << current_name
sleep block_time
end
end

Timeout.timeout(max_test_time) do
subject.start
sleep 0.1 until started.size == expectations.size
end

running << started.pop until started.empty?

killed = ::Queue.new
killed << threads.pop.kill until threads.empty?
killed.pop.join until killed.empty?
end

context 'with 1 directory' do
let(:expectations) { { dir1: foo1 } }
it 'runs all the workers without blocking' do
expect(running.sort).to eq(expectations.keys)
end
end

context 'with 2 directories' do
let(:expectations) { { dir1: foo1, dir2: foo2 } }
it 'runs all the workers without blocking' do
expect(running.sort).to eq(expectations.keys)
end
end

context 'with 3 directories' do
let(:expectations) { { dir1: foo1, dir2: foo2, dir3: foo3 } }
it 'runs all the workers without blocking' do
expect(running.sort).to eq(expectations.keys)
end
end
end
end
end
2 changes: 0 additions & 2 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ def ci?
# See http://rubydoc.info/gems/rspec-core/RSpec/Core/Configuration
RSpec.configure do |config|
config.order = :random
config.filter_run focus: true
config.run_all_when_everything_filtered = true
# config.fail_fast = !ci?
config.expect_with :rspec do |c|
c.syntax = :expect
Expand Down

0 comments on commit 5764592

Please sign in to comment.