-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2 from colinquirk/add-eeg-support
Add eeg support, mock tracker object
- Loading branch information
Showing
7 changed files
with
291 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
import socket | ||
import time | ||
|
||
import psychopy.event | ||
import psychopy.parallel | ||
import psychopy.visual | ||
|
||
|
||
class PyPlugger: | ||
def __init__(self, window, config_file, tcp_ip="100.1.1.3", | ||
tcp_port=6700, parallel_port_address=53328, text_color=None): | ||
self.window = window | ||
self.config_file = config_file | ||
self.tcp_ip = tcp_ip | ||
self.tcp_port = tcp_port | ||
self.current_mode = None | ||
self.socket = None | ||
|
||
psychopy.parallel.setPortAddress(parallel_port_address) | ||
psychopy.parallel.setData(0) | ||
|
||
if text_color is None: | ||
if all(i >= 0.5 for i in self.window.color): | ||
self.text_color = (-1, -1, -1) | ||
else: | ||
self.text_color = (1, 1, 1) | ||
else: | ||
self.text_color = text_color | ||
|
||
def initialize_session(self, experiment_name, subject_number, timeout=5): | ||
messages = ['1' + self.config_file, | ||
'2' + str(experiment_name), | ||
'3' + str(subject_number), | ||
'4'] | ||
|
||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
self.socket.settimeout(timeout) | ||
self.socket.connect((self.tcp_ip, self.tcp_port)) | ||
|
||
for tcp_message in messages: | ||
self.socket.send(tcp_message.encode()) | ||
time.sleep(1) | ||
|
||
def switch_mode(self, mode, delay=5): | ||
self.socket.send(mode.encode()) | ||
self.current_mode = mode | ||
time.sleep(delay) | ||
|
||
def start_recording(self, delay=5): | ||
self.socket.send("S".encode()) | ||
time.sleep(delay) # Ensure recording has started | ||
|
||
def stop_recording(self, delay=5, exit_mode=False): | ||
if exit_mode: | ||
cmd = 'X' | ||
else: | ||
cmd = 'Q' | ||
|
||
self.socket.send(cmd.encode()) | ||
time.sleep(delay) # Ensure recording has ended | ||
|
||
@staticmethod | ||
def start_event(event): | ||
psychopy.parallel.setData(event) | ||
|
||
@staticmethod | ||
def end_event(): | ||
"""To be called some time after an event has been sent""" | ||
psychopy.parallel.setData(0) | ||
|
||
def display_eeg_instructions(self, eeg_instruction_text=None): | ||
self.window.flip() | ||
|
||
if eeg_instruction_text is None: | ||
eeg_instruction_text = ('We will be recording EEG in this experiment. ' | ||
'In order to prevent artifacts due to muscle movements, please' | ||
' try to avoid moving your head and clenching your jaw.\n\n' | ||
'Please avoid blinking while performing the task. ' | ||
'Try to blink only in between trials.') | ||
|
||
psychopy.visual.TextStim(self.window, color=self.text_color, units='norm', pos=(0, 0.22), | ||
height=0.05, text=eeg_instruction_text).draw() | ||
|
||
psychopy.visual.TextStim(self.window, color=self.text_color, units='norm', pos=(0, -0.28), | ||
height=0.05, text='Press any key to continue.').draw() | ||
|
||
self.window.flip() | ||
psychopy.event.waitKeys() | ||
self.window.flip() | ||
|
||
def display_interactive_switch_screen(self, require_monitoring=True): | ||
switch_text = ('You may switch modes now.\n\n' | ||
'Press "M" for monitor mode.\n' | ||
'Press "I" for impedance mode.\n' | ||
'Press "Q" to continue with the experiment.') | ||
|
||
psychopy.visual.TextStim(self.window, switch_text, color=self.text_color).draw() | ||
self.window.flip() | ||
|
||
response = None | ||
while response != 'q': | ||
response = psychopy.event.waitKeys(keyList=['m', 'i', 'q'])[0] | ||
if response != 'q': | ||
self.switch_mode(response.upper()) | ||
|
||
if self.current_mode != 'M': | ||
self.switch_mode('M') | ||
|
||
self.window.flip() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import time | ||
|
||
from psychopy import monitors | ||
from psychopy import visual | ||
|
||
import pyplugger | ||
|
||
# Necessary psychopy setup | ||
monitor = monitors.Monitor('test_monitor', width=53, distance=70) | ||
monitor.setSizePix([1920, 1080]) | ||
|
||
win = visual.Window([800, 600], units="pix", color=[0, 0, 0], monitor=monitor) | ||
|
||
visual.TextStim(win, 'Beginning PyPlugger test...').draw() | ||
win.flip() | ||
|
||
# Initialization | ||
eeg = pyplugger.PyPlugger(win, config_file='C:\\Users\\AwhVogel\\Desktop\\Colin\\default.xml') | ||
|
||
eeg.initialize_session('test_exp', 0) | ||
print('Initalization tests passed...') | ||
|
||
eeg.display_eeg_instructions() | ||
eeg.display_interactive_switch_screen() # End in monitoring mode | ||
|
||
eeg.start_recording() | ||
for i in range(5): | ||
eeg.start_event(i) | ||
time.sleep(0.5) | ||
eeg.end_event() | ||
time.sleep(0.5) | ||
|
||
print('Basic functionality tests passed...') | ||
|
||
eeg.display_interactive_switch_screen() # End in monitoring mode | ||
|
||
# Minimum delay testing | ||
for i in range(5): | ||
eeg.start_event(i) | ||
time.sleep(0.05) | ||
eeg.end_event() | ||
time.sleep(0.05) | ||
eeg.stop_recording(exit_mode=True) | ||
time.sleep(5) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters