-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathpeyetribe.py
665 lines (515 loc) · 24.1 KB
/
peyetribe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
"""
Simple python (2 and 3) interface to the Eye Tribe eye tracker (http://theeyetribe.com)
See README.md for instructions
Created by Per Baekgaard / [email protected] / [email protected], March 2014
Licensed under the MIT License:
Copyright (c) 2014, Per Baekgaard, Technical University of Denmark, DTU Informatics, Cognitive Systems Section
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the Software without restriction, including without
limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
__author__ = "Per Baekgaard"
__copyright__ = \
"Copyright (c) 2014, Per Baekgaard, Technical University of Denmark, DTU Informatics, Cognitive Systems Section"
__license__ = "MIT"
__version__ = "0.4"
__email__ = "[email protected]"
__status__ = "Alpha"
import sys
if sys.version_info[0] == 2:
import Queue as q
else:
import queue as q
import time
from datetime import datetime
import threading
import socket
import json
class EyeTribe():
"""
Main class to handle the Eye Tracker interface and values.
Includes subclasses Frame (that holds an entire tracker frame with
both-eye positions) and Frame.Eye and Coord subclasses holding (single eye)
data and all (x,y) coordinates of eye and bounding boxes
"""
etm_get_init = '{ "category": "tracker", "request" : "get", "values": [ "iscalibrated", "heartbeatinterval" ] }'
etm_calib = '{ "category": "calibration", "request" : "start", "values": { "pointcount": %d } }'
etm_calib_abort = '{ "category": "calibration", "request" : "abort" }'
etm_calib_clear = '{ "category": "calibration", "request" : "clear" }'
etm_cpstart = '{ "category": "calibration", "request" : "pointstart", "values": { "x": %d, "y": %d } }'
etm_cpend = '{ "category": "calibration", "request" : "pointend" }'
etm_get_screenres = '{ "category": "tracker", "request" : "get", "values": [ "screenresw", "screenresh" ] }'
etm_set_push = '{ "category": "tracker", "request" : "set", "values": { "push": true } }'
etm_set_pull = '{ "category": "tracker", "request" : "set", "values": { "push": false } }'
etm_get_frame = '{ "category": "tracker", "request" : "get", "values": [ "frame" ] }'
etm_heartbeat = '{ "category": "heartbeat" }'
etm_buffer_size = 4096
class Coord():
"""Single (x,y) positions relative to screen or bounding box. Used in Frame and Calibration."""
def __init__(self, x=0, y=0, ssep=';', fmt="%d"):
self._x = x
self._y = y
self._ssep = ssep
self._fmt = fmt
@property
def x(self):
"""The horizontal cartesian offset (abcissa)."""
return self._x
@x.setter
def x(self, val):
self._x = val
@property
def y(self):
"""The vertical cartesian offset (ordinate)."""
return self._y
@y.setter
def y(self, val):
self._y = val
def __str__(self):
return (self._fmt + "%s" + self._fmt) % (self._x, self._ssep, self._y)
class Frame():
"""
Holds a complete decoded frame from the eye tracker.
Access members via accessor functions or convert to string via str(...)
"""
class Eye:
"""Single-eye data, including gaze coordinates and pupil size"""
def __init__(self, raw, avg, psize, pcenter, ssep=';'):
self._raw = raw
self._avg = avg
self._psize = psize
self._pcenter = pcenter
self._ssep = ssep
@property
def raw(self):
"""The raw (unfiltered) cartesian eye coordinate vs screen coordinates."""
return self._raw
@raw.setter
def raw(self, val):
self._raw = val
@property
def avg(self):
"""The averaged (filtered) cartesian eye coordinate vs screen coordinates."""
return self._avg
@avg.setter
def avg(self, val):
self._avg = val
@property
def psize(self):
"""A relative estimate of the pupil size."""
return self._psize
@psize.setter
def psize(self, val):
self._psize = val
@property
def pcenter(self):
"""The center coordinate of the eye within the bounding box."""
return self._pcenter
@pcenter.setter
def pcenter(self, val):
self._pcenter = val
def __str__(self):
return "%s%s%s%s%.1f%s%s" % \
(str(self._raw), self._ssep, str(self._avg), self._ssep, self._psize, self._ssep, str(self._pcenter))
def __init__(self, json, ssep=';'):
"""
Creates a frame based on an unpacked version of the eye tracker json string.
The ssep is used for separating values when the frame is converted to
a string, as in a print statement. This is useful for dumping csv files.
"""
self._json = json
self._etime = time.time()
self._time = json['time'] / 1000.0
ts = datetime.strptime(json['timestamp'], "%Y-%m-%d %H:%M:%S.%f")
self._timestamp = int(time.mktime(ts.timetuple())) + int(ts.strftime("%f"))/1000000.0
self._fix = json['fix']
self._state = json['state']
self._raw = EyeTribe.Coord(json['raw']['x'], json['raw']['y'])
self._avg = EyeTribe.Coord(json['avg']['x'], json['avg']['y'])
eye = json['lefteye']
self._lefteye = EyeTribe.Frame.Eye(
EyeTribe.Coord(eye['raw']['x'], eye['raw']['y']),
EyeTribe.Coord(eye['avg']['x'], eye['avg']['y']),
eye['psize'],
EyeTribe.Coord(eye['pcenter']['x'], eye['pcenter']['y'], fmt="%.3f")
)
eye = json['righteye']
self._righteye = EyeTribe.Frame.Eye(
EyeTribe.Coord(eye['raw']['x'], eye['raw']['y']),
EyeTribe.Coord(eye['avg']['x'], eye['avg']['y']),
eye['psize'],
EyeTribe.Coord(eye['pcenter']['x'], eye['pcenter']['y'], fmt="%.3f")
)
self._ssep = ssep
@property
def json(self):
"""The 'original' json dict from the eye tracker -- for the curious or for debugging"""
return self._json
@property
def etime(self):
"""The wall-time epoch at the point when the frame is unpacked on the client."""
return self._etime
@etime.setter
def etime(self, val):
self._etime = val
@property
def time(self):
"""A monotoneous clock value from the tracker."""
return self._time
@time.setter
def time(self, val):
self._time = val
@property
def timestamp(self):
"""The wall-time epoch at the point the eye tracker server created the frame."""
return self._timestamp
@timestamp.setter
def timestamp(self, val):
self._timestamp = val
@property
def fix(self):
"""The fixation flag (True or False) from the eye tracker."""
return self._fix
@fix.setter
def fix(self, val):
self._fix = val
@property
def state(self):
"""The state from the eye tracker (a numeric value)."""
return self._state
@state.setter
def state(self, val):
self._state = val
@property
def avg(self):
"""An averaged fixation coordinate based on both eyes."""
return self._avg
@avg.setter
def avg(self, val):
self._avg = val
@property
def raw(self):
"""The raw (unfiltered) fixation coordinate based on both eyes."""
return self._raw
@raw.setter
def raw(self, val):
self._raw = val
@property
def lefteye(self):
"""Left eye coordinates, pupil position and size."""
return self._lefteye
@lefteye.setter
def lefteye(self, val):
self._lefteye = val
@property
def righteye(self):
"""Right eye coordinates, pupil position and size."""
return self._righteye
@righteye.setter
def righteye(self, val):
self._righteye = val
def eye(self, left=False):
if left:
return self._lefteye
else:
return self._righteye
def __str__(self):
# header = "eT;dT;aT;Fix;State;Rwx;Rwy;Avx;Avy;LRwx;LRwy;LAvx;LAvy;LPSz;LCx;LCy;RRwx;RRwy;RAvx;RAvy;RPSz;RCx;RCy"
st = 'L' if (self._state & 0x10) else '.'
st += 'F' if (self._state & 0x08) else '.'
st += 'P' if (self._state & 0x04) else '.'
st += 'E' if (self._state & 0x02) else '.'
st += 'G' if (self._state & 0x01) else '.'
f = 'F' if self._fix else 'N'
s = "%014.3f%s%07.3f%s%07.3f%s" % (self._etime, self._ssep, self._time, self._ssep, self._timestamp, self._ssep,)
s += "%s%s%s%s%s%s%s" % (f, self._ssep, st, self._ssep, str(self._raw), self._ssep, str(self._avg))
s += "%s%s" % (self._ssep, str(self._lefteye))
s += "%s%s" % (self._ssep, str(self._righteye))
return s
class Calibration():
def __init__(self):
self.result = False
self.deg = None
self.degl = None
self.degr = None
self.pointcount = 0
self.points = None
class CalibrationPoint():
def __init__(self):
self.state = -1
self.cp = EyeTribe.Coord()
self.mecp = EyeTribe.Coord()
self.ad = None
self.adl = None
self.adr = None
self.mep = None
self.mepl = None
self.mepr = None
self.asd = None
self.asdl = None
self.asdr = None
def __init__(self, host='localhost', port=6555, ssep=';', screenindex=0):
"""
Create an EyeTribe connection object that can be used to connect to an eye tracker.
Parameters host and port are the values to use when connecting to the tracker.
The ssep can be used to specify an alternative value for value separators when
printing out a value.
"""
self._host = host
self._port = port
self._sock = None
self._ispushmode = False
self._hbinterval = 0 # Note: this is (converted to a value in) seconds
self._hbeater = None
self._listener = None
self._frameq = q.Queue()
self._replyq = q.Queue()
self._reply_lock = threading.Semaphore() # Keeps track of whether someone needs a reply
self._pmcallback = None
self._ssep = ssep
self._screenindex = screenindex
self._calibres = EyeTribe.Calibration()
def _tell_tracker(self, message):
"""
Send the (canned) message to the tracker and return the reply properly parsed.
Raises an exception if we get an error message back from the tracker (anything status!=200)
"""
if not self._listener:
raise Exception("Internal error; listener is not running so we cannot get replies from the tracker!")
if not self._replyq.empty():
raise Exception("Tracker protocol error; we have a queue reply before asking for something: %s" % (self._replyq.get()))
# lock semaphore to ensure we're the only ones opening a request that expects a reply from the tracker
self._reply_lock.acquire()
self._sock.send(message.encode())
reply = self._replyq.get(True)
# release the lock again now that we have the expected reply
self._reply_lock.release()
sc = reply['statuscode']
if sc != 200:
raise Exception("Tracker protocol error (%d) on message '%s'" % (sc, message))
return reply
def connect(self):
"""
Connect an eyetribe object to the actual Eye Tracker by establishing a TCP/IP connection.
Also gets heartbeatinterval information, and sets up the heartbeater and listener threads
"""
def _hbeater_thread():
"""sends heartbeats at the required interval until the connection is closed, but does not read any replies"""
sys.stderr.write("_hbeater starting\n")
while self._sock:
self._sock.send(EyeTribe.etm_heartbeat.encode())
time.sleep(self._hbinterval)
sys.stderr.write("_hbeater ending\n")
return
def _listener_thread():
"""
Listens for replies from the tracker (including heartbeat replies) and dispatches or deletes those as needed
This is the only place where we listen for replies from the tracker
Currently assumes there are continous heartbeats, otherwise we will time out at some point...
"""
sys.stderr.write("_listener starting\n")
while self._sock:
# Keep going until we're asked to terminate (or we timeout with an error)
try:
r = self._sock.recv(EyeTribe.etm_buffer_size)
# Multiple replies handled assuming non-documented \n is sent from the tracker, but (TODO) not split frames,
for js in r.decode().split("\n"):
if js.strip() != "":
f = json.loads(js)
# handle heartbeat and calibration OK results, and store other stuff to proper queues
sc = f['statuscode']
if f['category'] == "heartbeat":
pass
elif f['category'] == 'calibration' and sc == 800:
pass
elif self._ispushmode and 'values' in f and 'frame' in f['values']:
if sc != 200:
raise Exception("Connection failed, protocol error (%d)", sc)
ef = EyeTribe.Frame(f['values']['frame'])
if self._pmcallback != None:
dont_queue = self._pmcallback(ef)
else:
dont_queue = False
if not dont_queue:
self._frameq.put(ef)
else:
# use semaphore to verify someone is waiting for a reply and give it to them (or fail!)
if self._reply_lock.acquire(False):
self._reply_lock.release()
raise Exception("Connection protocol error; got reply but no-one asked for it: %s" % js)
else:
self._replyq.put(f)
except (socket.timeout, OSError):
if self._sock:
raise Exception("The connection failed with a timeout or OSError; lost tracker connection?")
sys.stderr.write("_listener ending\n")
if self._sock is None:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self._host, self._port))
self._sock.settimeout(30)
try:
# setup listener to picks up replies etc; is needed very early on for comms to work
self._listener = threading.Thread(target=_listener_thread)
self._listener.daemon = True
self._listener.start()
p = self._tell_tracker(EyeTribe.etm_get_init)
self._hbinterval = int(p['values']['heartbeatinterval']) / 1000.0
if self._hbinterval != 0:
self._sock.settimeout(self._hbinterval*2)
# setup heart-beat generator
if self._hbinterval != 0:
self._hbeater = threading.Thread(target=_hbeater_thread)
self._hbeater.daemon = True
self._hbeater.start()
else:
self._hbeater = None
except ValueError:
raise
else:
raise Exception("cannot connect an already connected socket; close it first")
def bind(self, host='localhost', port=6555):
"""(Re)binds a non-connected Eye Tribe object to another host/port."""
if not self._sock is None:
self._host = host
self._port = port
else:
raise Exception("cannot (re)bind a connected socket; close it first")
def close(self, quick=False):
"""
Close TCP/IP connection, returning the object back to its starting condition.
If quick is True, do NOT wait for the listener and heartbeat threads to stop.
"""
if not self._sock.close is None:
_s = self._sock
self._sock = None
_s.close()
if not quick:
# sync for listener to stop
self._listener.join(min((self._hbinterval*3, 30)))
if self._listener.is_alive():
raise Exception("Listener thread did not terminate as expected; protocol error?")
# and for the heartbeater to stop as well
if self._hbinterval != 0:
self._hbeater.join(min((self._hbinterval*3, 10)))
if self._hbeater.is_alive():
raise Exception("HeartBeater thread did not terminate as expected; protocol error?")
self._listener = None
self._hbeater = None
else:
raise Exception("cannot close an already closed connection")
def pushmode(self, callback=None):
"""
Change to push mode, i.e. setup and start receiving tracking data
Requires a connected tracker that also has been calibrated
If callback is not given, frames are just stored to the queue and can be retrieved with
the next() operation; otherwise the callback is invoked with the new frame as parameter.
The callback can return True to indicate that no further processing should be done
on the frame; otherwise the frame will be queued as normal for later retrieval by next().
Note that the callback is called on the sockets listener thread!
"""
# if already in pushmode, do nothing...
if self._ispushmode:
return
if callback!=None:
self._pmcallback = callback
self._tell_tracker(EyeTribe.etm_set_push)
self._ispushmode = True
def pullmode(self):
"""
Change to pull mode, i.e. prompt by calling next() whenever you pull for a frame.
Requires a connected tracker that also has been calibrated
"""
if self._ispushmode:
self._tell_tracker(EyeTribe.etm_set_pull)
self._ispushmode = False
self._pmcallback = None
def next(self, block=True):
"""
Returns the next (queued or pulled) dataset from the eyetracker.
If block is False, and we're in pushmode and the queue is empty, None is returned immediatedly,
otherwise we will wait for the next frame to arrive and return that
"""
if self._ispushmode:
try:
return self._frameq.get(block)
except q.Empty:
return None
else:
p = self._tell_tracker(EyeTribe.etm_get_frame)
return EyeTribe.Frame(p['values']['frame'])
def get_screen_res(self):
p = self._tell_tracker(EyeTribe.etm_get_screenres)
maxx = p['values']['screenresw']
maxy = p['values']['screenresh']
return (maxx, maxy)
def calibration_start(self, pointcount=9):
"""
(Re)run the calibration procedure with pointcount points.
Call calibration_point_start and calibration_point_end for each point when it displays on the screen.
The result can be retrieved by latest_calibration_result after the calibration has completed.
"""
self._tell_tracker(EyeTribe.etm_calib % pointcount)
def calibration_point_start(self, x, y):
self._tell_tracker(EyeTribe.etm_cpstart % (x, y))
def calibration_point_end(self):
p = self._tell_tracker(EyeTribe.etm_cpend)
if 'values' in p:
self._calibres.result = p['values']['calibresult']['result']
self._calibres.deg = p['values']['calibresult']['deg']
self._calibres.degl = p['values']['calibresult']['degl']
self._calibres.degr = p['values']['calibresult']['degr']
cps = p['values']['calibresult']['calibpoints']
self._calibres.points = [ EyeTribe.CalibrationPoint() for i in range(len(cps)) ]
for i in range(len(cps)):
self._calibres.points[i].state = cps[i]['state']
self._calibres.points[i].cp = EyeTribe.Coord(cps[i]['cp']['x'], cps[i]['cp']['y'])
self._calibres.points[i].mecp = EyeTribe.Coord(cps[i]['cp']['x'], cps[i]['cp']['y'])
self._calibres.points[i].ad = cps[i]['acd']['ad']
self._calibres.points[i].adl = cps[i]['acd']['adl']
self._calibres.points[i].adr = cps[i]['acd']['adr']
self._calibres.points[i].mep = cps[i]['mepix']['mep']
self._calibres.points[i].mepl = cps[i]['mepix']['mepl']
self._calibres.points[i].mepr = cps[i]['mepix']['mepr']
self._calibres.points[i].asd = cps[i]['asdp']['asd']
self._calibres.points[i].asdl = cps[i]['asdp']['asdl']
self._calibres.points[i].asdr = cps[i]['asdp']['asdr']
'''
if self._calibres.result:
print("NOTICE: Tracker calibrated succesfully, average error is %0.1f deg (L: %0.1f, R: %0.1f)" %
(self._calibres.deg, self._calibres.degl, self._calibres.degr))
else:
print("WARNING: Tracker failed to calibrate")
'''
def calibration_abort(self):
self._tell_tracker(EyeTribe.etm_calib_abort)
def calibration_clear(self):
self._tell_tracker(EyeTribe.etm_calib_clear)
def latest_calibration_result(self):
return self._calibres
if __name__ == "__main__":
"""
Example usage -- this code is only executed if file is run directly
not when imported as a module, but it shows how to use this module:
from peyetribe import EyeTribe
import time
"""
tracker = EyeTribe()
tracker.connect()
n = tracker.next()
print("eT;dT;aT;Fix;State;Rwx;Rwy;Avx;Avy;LRwx;LRwy;LAvx;LAvy;LPSz;LCx;LCy;RRwx;RRwy;RAvx;RAvy;RPSz;RCx;RCy")
tracker.pushmode()
count = 0
while count < 100:
n = tracker.next()
print(n)
count += 1
tracker.pullmode()
tracker.close()