Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use one fsevent_watch process per listener instead of one per dir #470

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 24 additions & 38 deletions lib/listen/adapter/darwin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,59 +35,45 @@ def self.usable?

private

# NOTE: each directory gets a DIFFERENT callback!
def _configure(dir, &callback)
require 'rb-fsevent'
opts = { latency: options.latency }

@workers ||= ::Queue.new
@workers << FSEvent.new.tap do |worker|
_log :debug, "fsevent: watching: #{dir.to_s.inspect}"
worker.watch(dir.to_s, opts, &callback)
end
@callbacks[dir] = callback
end

def _run
tgxworld marked this conversation as resolved.
Show resolved Hide resolved
tgxworld marked this conversation as resolved.
Show resolved Hide resolved
tgxworld marked this conversation as resolved.
Show resolved Hide resolved
tgxworld marked this conversation as resolved.
Show resolved Hide resolved
first = @workers.pop

# NOTE: _run is called within a thread, so run every other
# worker in it's own thread
_run_workers_in_background(_to_array(@workers))
_run_worker(first)
require 'rb-fsevent'
worker = FSEvent.new
dirs_to_watch = @callbacks.keys.map(&:to_s)
_log(:info) { "fsevent: watching: #{dirs_to_watch.inspect}" }
tgxworld marked this conversation as resolved.
Show resolved Hide resolved
worker.watch(dirs_to_watch, { latency: options.latency }, &method(:_process_changes))
Listen::Internals::ThreadPool.add { _run_worker(worker) }
end

def _process_event(dir, event)
_log :debug, "fsevent: processing event: #{event.inspect}"
event.each do |path|
new_path = Pathname.new(path.sub(%r{\/$}, ''))
_log :debug, "fsevent: #{new_path}"
# TODO: does this preserve symlinks?
rel_path = new_path.relative_path_from(dir).to_s
_queue_change(:dir, dir, rel_path, recursive: true)
def _process_changes(dirs)
dirs.each do |dir|
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm aware there are two loops here but for the average use case, this is going to consist of only a couple of directories that change each time.

dir = Pathname.new(dir.sub(%r{\/$}, ''))

@callbacks.each do |watched_dir, callback|
tgxworld marked this conversation as resolved.
Show resolved Hide resolved
if watched_dir.eql?(dir) || Listen::Directory.ascendant_of?(watched_dir, dir)
callback.call(dir)
end
end
end
end

def _process_event(dir, path)
_log(:debug) { "fsevent: processing path: #{path.inspect}" }
# TODO: does this preserve symlinks?
rel_path = path.relative_path_from(dir).to_s
_queue_change(:dir, dir, rel_path, recursive: true)
end

def _run_worker(worker)
_log :debug, "fsevent: running worker: #{worker.inspect}"
_log(:debug) { "fsevent: running worker: #{worker.inspect}" }
worker.run
rescue
format_string = 'fsevent: running worker failed: %s:%s called from: %s'
_log_exception format_string, caller
end

def _run_workers_in_background(workers)
workers.each do |worker|
# NOTE: while passing local variables to the block below is not
# thread safe, using 'worker' from the enumerator above is ok
Listen::Internals::ThreadPool.add { _run_worker(worker) }
end
end

def _to_array(queue)
workers = []
workers << queue.pop until queue.empty?
workers
end
end
end
end
6 changes: 6 additions & 0 deletions lib/listen/directory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ def self.scan(snapshot, rel_path, options)
raise
end

def self.ascendant_of?(base, other)
other.ascend do |ascendant|
break true if base == ascendant
tgxworld marked this conversation as resolved.
Show resolved Hide resolved
end
end

def self._async_changes(snapshot, path, previous, options)
fail "Not a Pathname: #{path.inspect}" unless path.respond_to?(:children)
previous.each do |entry, data|
Expand Down
126 changes: 0 additions & 126 deletions spec/lib/listen/adapter/darwin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,130 +69,4 @@
it { should eq 1234 }
end
end

describe 'multiple dirs' do
ioquatix marked this conversation as resolved.
Show resolved Hide resolved
let(:dir1) { fake_path('/foo/dir1', cleanpath: fake_path('/foo/dir1')) }
let(:dir2) { fake_path('/foo/dir2', cleanpath: fake_path('/foo/dir1')) }
let(:dir3) { fake_path('/foo/dir3', cleanpath: fake_path('/foo/dir1')) }

before do
allow(config).to receive(:queue).and_return(queue)
allow(config).to receive(:silencer).and_return(silencer)
end

let(:foo1) { double('fsevent1') }
let(:foo2) { double('fsevent2') }
let(:foo3) { double('fsevent3') }

before do
allow(FSEvent).to receive(:new).and_return(*expectations.values, nil)
expectations.each do |dir, obj|
allow(obj).to receive(:watch).with(dir.to_s, latency: 0.1)
end
end

describe 'configuration' do
before do
subject.configure
end

context 'with 1 directory' do
let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } }

let(:expectations) { { '/foo/dir1': foo1 } }

it 'configures directory' do
expect(foo1).to have_received(:watch).with('/foo/dir1', latency: 0.1)
end
end

context 'with 2 directories' do
let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } }
let(:expectations) { { dir1: foo1, dir2: foo2 } }

it 'configures directories' do
expect(foo1).to have_received(:watch).with('dir1', latency: 0.1)
expect(foo2).to have_received(:watch).with('dir2', latency: 0.1)
end
end

context 'with 3 directories' do
let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } }
let(:expectations) do
{
'/foo/dir1': foo1,
'/foo/dir2': foo2,
'/foo/dir3': foo3
}
end

it 'configures directories' do
expect(foo1).to have_received(:watch).with('/foo/dir1', latency: 0.1)
expect(foo2).to have_received(:watch).with('/foo/dir2', latency: 0.1)
expect(foo3).to have_received(:watch).with('/foo/dir3', latency: 0.1)
end
end
end

describe 'running threads' do
let(:running) { [] }
let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } }

before do
started = ::Queue.new
threads = ::Queue.new
left = ::Queue.new

# NOTE: Travis has a hard time creating threads on OSX
thread_start_overhead = 3
max_test_time = 3 * thread_start_overhead
block_time = max_test_time + thread_start_overhead

expectations.each do |name, _|
left << name
end

expectations.each do |_, obj|
allow(obj).to receive(:run) do
current_name = left.pop
threads << Thread.current
started << current_name
sleep block_time
end
end

Timeout.timeout(max_test_time) do
subject.start
sleep 0.1 until started.size == expectations.size
end

running << started.pop until started.empty?

killed = ::Queue.new
killed << threads.pop.kill until threads.empty?
killed.pop.join until killed.empty?
end

context 'with 1 directory' do
let(:expectations) { { dir1: foo1 } }
it 'runs all the workers without blocking' do
expect(running.sort).to eq(expectations.keys)
end
end

context 'with 2 directories' do
let(:expectations) { { dir1: foo1, dir2: foo2 } }
it 'runs all the workers without blocking' do
expect(running.sort).to eq(expectations.keys)
end
end

context 'with 3 directories' do
let(:expectations) { { dir1: foo1, dir2: foo2, dir3: foo3 } }
it 'runs all the workers without blocking' do
expect(running.sort).to eq(expectations.keys)
end
end
end
end
end