Blinkreconstruct_dot_py_in_R
def_blinkreconstruct_recursive
def _blinkreconstruct_recursive(a, vt_start=10, vt_end=5, maxdur=500,
=10, gap_margin=20, gap_vt=10,
margin=21, std_thr=3,
smooth_winlen=None):
processed_blink_points"""Implements a recursive blink-reconstruction algorithm that is a big
improvement over the original algorithm.
"""
The parameters passed into this function are:
a = the pupil trace
vt_start = when the eye starts closing, default at 10 (??)
vt_end = when the eye opens, default at 5 (??)
maxdur = maximum duration for a blink before it pulls an error (if a blink is really long, that can imply some other problem)
margin = extra buffer added before and after detected blinks, default at 10
gap_margin = margin around the trimmed data (to remove potential errors near detected gaps)
gap_vt = velocity threshold for detecting gaps in the signal
smooth_winlen = window length for the smoother
std_thr = standard deviation threshold for trimming outliers
processed_blink_points = keeps track of already processed blinks to prevent infinite loops
if processed_blink_points is None:
= []
processed_blink_points def fnc_recursive(a):
"""Shortcut for recursive function call that retains all keywords."""
return _blinkreconstruct_recursive(
=vt_start, vt_end=vt_end, maxdur=maxdur, margin=margin,
a, vt_start=gap_margin, gap_vt=gap_vt, smooth_winlen=smooth_winlen,
gap_margin=std_thr, processed_blink_points=processed_blink_points) std_thr
This part above seems to be part of how the function is able to recursively run without having to call all the parameters again.
Velocity calculation and smoothing
# Create a copy of the signal, smooth it, and calculate the velocity
= np.copy(a)
a try:
= srs._smooth(a, winlen=smooth_winlen)
strace except Exception as e:
warn(e)= a
strace = np.diff(strace) vtrace
The above creates a copy, then uses srs._smooth (don’t know what kind of smoothing function this is?), perhaps to prevent false blink detections caused by small fluctuations. The “except Exception” part is like if something weird happens, a warning is issued instead of crashing the program.
np.diff(strace) calculates the first derivative of the smoothed pupil signal, which approximates the velocity of pupil size from one timepoint to the next.
Detect blinks
# Get the first occuring blink
= _blink_points(vtrace, vt_start=vt_start, vt_end=vt_end,
blink_points =maxdur, margin=margin) maxdur
Above calls the function blink_points() and puts in the velocity we just calculated, the thresholds (vt_start and vt_end), the maximum duration for blinks and the margin around the blink that should also be considered for reconstruction.
# If no blink exists, we trim the signal as a final operation and then
# leave it.
if blink_points is None:
'no more blinks')
logger.debug(return _trim(a, vtrace, std_thr=std_thr, gap_margin=gap_margin,
=gap_vt)
gap_vtif list(blink_points) in processed_blink_points:
'Blink reconstruction entered infinite loop. This '
logger.warning('likely indicates noisy data. Aborting blink '
'reconstruction for this signal.')
return a
list(blink_points)) processed_blink_points.append(
If the above blink_points() function did not find any blinks, it lets us know, and then gives us the pupil data after using the function trim() on it.
If there are so many blinks that the function just gets called recursively over and over again, the loop force quits and gives us that warning message that maybe our data doesn’t look too good (or maybe something else is going wrong).
Processed data is put into the processed data parameter.
Interpolation
# If a blink exists, see if we can get four valid points around it for
# cubic spline interpolation. If not, then we do linear interpolation.
= blink_points
istart, iend = _cubic_spline_points(vtrace, istart, iend)
cubic_spline_points if cubic_spline_points is None:
'linear interpolation: {}'.format(str(blink_points)))
logger.debug(= interp1d(blink_points, a[blink_points])
interp_fnc else:
'cubic-spline interpolation: {}'.format(
logger.debug(str(cubic_spline_points)))
= interp1d(
interp_fnc
cubic_spline_points,
a[cubic_spline_points],='cubic')
kind= np.arange(istart, iend)
interp_x = interp_fnc(interp_x)
interp_y = interp_y
a[interp_x] # Recursive call to self to continue cleaning up other blinks (if any)
return fnc_recursive(a)
Above uses cubic spline interpolation using the cubic_spline_points function. If that didn’t work, it uses linear interpolation. Then, the code is run through again in its recursive way (until no more blinks are found).