From 6c16cb2ec5132df5f1ac80645e1c4b877ac63d52 Mon Sep 17 00:00:00 2001 From: Alan Tan Date: Sun, 22 Sep 2019 12:59:51 -0400 Subject: [PATCH] Use one `fsevent_watch` process per listener instead of one per dir --- lib/listen/adapter/darwin.rb | 62 +++++------- lib/listen/directory.rb | 6 ++ spec/lib/listen/adapter/darwin_spec.rb | 126 ------------------------- 3 files changed, 30 insertions(+), 164 deletions(-) diff --git a/lib/listen/adapter/darwin.rb b/lib/listen/adapter/darwin.rb index 55574c07..cdf11346 100644 --- a/lib/listen/adapter/darwin.rb +++ b/lib/listen/adapter/darwin.rb @@ -35,59 +35,45 @@ def self.usable? private - # NOTE: each directory gets a DIFFERENT callback! def _configure(dir, &callback) - require 'rb-fsevent' - opts = { latency: options.latency } - - @workers ||= ::Queue.new - @workers << FSEvent.new.tap do |worker| - _log :debug, "fsevent: watching: #{dir.to_s.inspect}" - worker.watch(dir.to_s, opts, &callback) - end + @callbacks[dir] = callback end def _run - first = @workers.pop - - # NOTE: _run is called within a thread, so run every other - # worker in it's own thread - _run_workers_in_background(_to_array(@workers)) - _run_worker(first) + require 'rb-fsevent' + worker = FSEvent.new + dirs_to_watch = @callbacks.keys.map(&:to_s) + _log(:info) { "fsevent: watching: #{dirs_to_watch.inspect}" } + worker.watch(dirs_to_watch, { latency: options.latency }, &method(:_process_changes)) + Listen::Internals::ThreadPool.add { _run_worker(worker) } end - def _process_event(dir, event) - _log :debug, "fsevent: processing event: #{event.inspect}" - event.each do |path| - new_path = Pathname.new(path.sub(%r{\/$}, '')) - _log :debug, "fsevent: #{new_path}" - # TODO: does this preserve symlinks? - rel_path = new_path.relative_path_from(dir).to_s - _queue_change(:dir, dir, rel_path, recursive: true) + def _process_changes(dirs) + dirs.each do |dir| + dir = Pathname.new(dir.sub(%r{\/$}, '')) + + @callbacks.each do |watched_dir, callback| + if watched_dir.eql?(dir) || Listen::Directory.ascendant_of?(watched_dir, dir) + callback.call(dir) + end + end end end + def _process_event(dir, path) + _log(:debug) { "fsevent: processing path: #{path.inspect}" } + # TODO: does this preserve symlinks? + rel_path = path.relative_path_from(dir).to_s + _queue_change(:dir, dir, rel_path, recursive: true) + end + def _run_worker(worker) - _log :debug, "fsevent: running worker: #{worker.inspect}" + _log(:debug) { "fsevent: running worker: #{worker.inspect}" } worker.run rescue format_string = 'fsevent: running worker failed: %s:%s called from: %s' _log_exception format_string, caller end - - def _run_workers_in_background(workers) - workers.each do |worker| - # NOTE: while passing local variables to the block below is not - # thread safe, using 'worker' from the enumerator above is ok - Listen::Internals::ThreadPool.add { _run_worker(worker) } - end - end - - def _to_array(queue) - workers = [] - workers << queue.pop until queue.empty? - workers - end end end end diff --git a/lib/listen/directory.rb b/lib/listen/directory.rb index 873a7223..4a4d78ea 100644 --- a/lib/listen/directory.rb +++ b/lib/listen/directory.rb @@ -53,6 +53,12 @@ def self.scan(snapshot, rel_path, options) raise end + def self.ascendant_of?(base, other) + other.ascend do |ascendant| + break true if base == ascendant + end + end + def self._async_changes(snapshot, path, previous, options) fail "Not a Pathname: #{path.inspect}" unless path.respond_to?(:children) previous.each do |entry, data| diff --git a/spec/lib/listen/adapter/darwin_spec.rb b/spec/lib/listen/adapter/darwin_spec.rb index 7850abc9..8886b78f 100644 --- a/spec/lib/listen/adapter/darwin_spec.rb +++ b/spec/lib/listen/adapter/darwin_spec.rb @@ -69,130 +69,4 @@ it { should eq 1234 } end end - - describe 'multiple dirs' do - let(:dir1) { fake_path('/foo/dir1', cleanpath: fake_path('/foo/dir1')) } - let(:dir2) { fake_path('/foo/dir2', cleanpath: fake_path('/foo/dir1')) } - let(:dir3) { fake_path('/foo/dir3', cleanpath: fake_path('/foo/dir1')) } - - before do - allow(config).to receive(:queue).and_return(queue) - allow(config).to receive(:silencer).and_return(silencer) - end - - let(:foo1) { double('fsevent1') } - let(:foo2) { double('fsevent2') } - let(:foo3) { double('fsevent3') } - - before do - allow(FSEvent).to receive(:new).and_return(*expectations.values, nil) - expectations.each do |dir, obj| - allow(obj).to receive(:watch).with(dir.to_s, latency: 0.1) - end - end - - describe 'configuration' do - before do - subject.configure - end - - context 'with 1 directory' do - let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } } - - let(:expectations) { { '/foo/dir1': foo1 } } - - it 'configures directory' do - expect(foo1).to have_received(:watch).with('/foo/dir1', latency: 0.1) - end - end - - context 'with 2 directories' do - let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } } - let(:expectations) { { dir1: foo1, dir2: foo2 } } - - it 'configures directories' do - expect(foo1).to have_received(:watch).with('dir1', latency: 0.1) - expect(foo2).to have_received(:watch).with('dir2', latency: 0.1) - end - end - - context 'with 3 directories' do - let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } } - let(:expectations) do - { - '/foo/dir1': foo1, - '/foo/dir2': foo2, - '/foo/dir3': foo3 - } - end - - it 'configures directories' do - expect(foo1).to have_received(:watch).with('/foo/dir1', latency: 0.1) - expect(foo2).to have_received(:watch).with('/foo/dir2', latency: 0.1) - expect(foo3).to have_received(:watch).with('/foo/dir3', latency: 0.1) - end - end - end - - describe 'running threads' do - let(:running) { [] } - let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } } - - before do - started = ::Queue.new - threads = ::Queue.new - left = ::Queue.new - - # NOTE: Travis has a hard time creating threads on OSX - thread_start_overhead = 3 - max_test_time = 3 * thread_start_overhead - block_time = max_test_time + thread_start_overhead - - expectations.each do |name, _| - left << name - end - - expectations.each do |_, obj| - allow(obj).to receive(:run) do - current_name = left.pop - threads << Thread.current - started << current_name - sleep block_time - end - end - - Timeout.timeout(max_test_time) do - subject.start - sleep 0.1 until started.size == expectations.size - end - - running << started.pop until started.empty? - - killed = ::Queue.new - killed << threads.pop.kill until threads.empty? - killed.pop.join until killed.empty? - end - - context 'with 1 directory' do - let(:expectations) { { dir1: foo1 } } - it 'runs all the workers without blocking' do - expect(running.sort).to eq(expectations.keys) - end - end - - context 'with 2 directories' do - let(:expectations) { { dir1: foo1, dir2: foo2 } } - it 'runs all the workers without blocking' do - expect(running.sort).to eq(expectations.keys) - end - end - - context 'with 3 directories' do - let(:expectations) { { dir1: foo1, dir2: foo2, dir3: foo3 } } - it 'runs all the workers without blocking' do - expect(running.sort).to eq(expectations.keys) - end - end - end - end end