-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
1292 lines (1207 loc) · 56.7 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
server.py
Interface between model and plexon
Developed by giljael
Version: 2015feb14 modified by salvadord
"""
from glob import glob
from socket import *
from neuron import h # for working with DP cells
from multiprocessing import Process, Manager, Queue, Value, Lock
from threading import Thread
import struct
import time
import os
import sys
import Queue
import array
import math
import numpy as np
import traceback
import shared as s
### Copied plexon config here
# a queue between the server and the virtual arm model
queue = Manager().Queue()
currNeuronTime = Value('d', 0.0) # for NEURON time exchange
currQueueTime = Value('d', 0.0) # for queue time exchange
newCurrTime = Value('d', 0.0)
lock = Lock()
# When there is no data from the plexon system,
# the client will send 'exit' to the server and
# terminate itserlf after timeOut * timeWnd time.
# ex) 100 * 500 = 50 seconds
timeOut = 500 #10000
# header ID
NODATA = 0
DATA = 1
EXIT = 2
ALIVE = 3
ACK = 4
INITDATA = 5
# channel start
CH_START = 1
# channel end
CH_END = 96
# unit end
UNIT_END = 4
# data size
NODATA_SIZE = 4 # header ID + timeInterval
DATA_SIZE = (CH_END + 1) * 8 # 1 for binning window. 8 is double in Matlab
EXIT_SIZE = 2 # only header ID
ACK_SIZE = 2 # only header ID
# For Latency measure
O_END = 1
O_SND = 2
O_RCV = 3
O_FLT = 4
#####################################################################
# Variables user may modify #
#####################################################################
# port number for sending initial data to the client from the server.
# When it is changed, initPort in client.m should be changed as well.
initPort = 7869
# port number for receiving spikes from the client
comPort = 9999
# Time window for binning and summing spikes
binWnd = 100 # (ms)
autorun = 1 # autorun- 0: interactive mode 1: autorun upto
# 0: Using monkey spike, 1: Using original data to update DP cells.
isClosedLoop = 0 # 0: open loop w/o arm, 1: closed loop with arm
isOriginal = 0 # 0: cliff's model + the server process, 1: Just Cliff's model
numPMd = CH_END
isCommunication = 1 # 0: read spikes from a file by the server, 1: get spikes through the queue
localFileRead = 1 # 0: real communication and spikes delivery through the queue
# 1: the queue test reading spikes in a file at the serer side
# 0: No output, 1: print information to stdout as well as files
verbose = 0
timeMeasure = 0
#ackServerIP = "10.244.18.233" # windows at the ACIS Lab.
ackServerIP = "138.5.98.28" # windows at the Francis Lab. (XPC64)
isUdp = 0 # 0: TCP, 1: UDP
simMode = 0 # simMode- Offline mode(0), online mode(1)
LR = 1000e3
unsortedSpkFilter = 0 # 0: counts unsorted unit, 1: filter unsorted unit
syncSpkFilter = 0 # 0: default, no filtering, 1: filter spikes having identical timestamps, 2: filter spikes in a same time window.
isDp = 0 # 0: NSLOC, 1:DP
isLwc = 0 # 0: Heavy weight client, 1: light weight client
SPKNUM = 20 # number of spikes in a chunk
SPKSZ = 3 * SPKNUM + 1 # size of a List to store SPKNUM (CH_ID, UNIT_ID, TS), 1 for spikes in the chunk, and 1 for flag to indicate if spike remains in a chunk
TIMEOUT = 20 * 0.001 # it should be second
syncWnd = 0.001
# For latency measure
qTimeList = []
deqTimeList = []
if verbose:
fdtime = open('SpikeTimeStamp.tsv', 'w')
fdtime.write('CH#\tUnit#\tTS\n')
fdtime2 = open('SpikeTimeStamp2.tsv', 'w')
fdtime2.write('CH#\tUnit#\tTS\tSimTS\n')
fdtime3 = open('log.tsv', 'w')
#####################################################################
# Parameter validation #
#####################################################################
if isOriginal == 1:
isCommunication = 0
if isCommunication == 0 or simMode == 1:
localFileRead = 0 # queue test w/ no communication is useless
if isDp == 1:
if simMode == 1:
LR = 500 # latency requirement (ms)
else: # NSLOC
if simMode == 1:
LR = 50
if simMode == 0:
LR = 1000e3
if unsortedSpkFilter == 0:
UNIT_START = 0
else :
UNIT_START = 1
# Print out server's information
def getServerInfo():
if simMode == 1:
fname = "Online/"
else:
fname = "Offline/"
if isUdp == 1:
fname += "UDP/"
else:
fname += "TCP/"
if isDp == 1:
fname += "DP/binning("
if isLwc == 1:
fname += "Lwc)/"
else:
fname += "Hwc)/"
else:
fname += "NSLOC/"
if unsortedSpkFilter == 1:
fname += "unsortedSpkFilter("
if isLwc == 1:
fname += "Lwc)/"
else:
fname += "Hwc)/"
if syncSpkFilter:
fname += "syncSpkFilter("
if isLwc == 1:
fname += "Lwc)/"
else:
fname += "Hwc)/"
if verbose:
fname += "Verbose/"
print fname + "server is launched!!!"
# Find NetCon by gid for None->NetCon->NSLOC
def checkLocalIndexbyKey(dicName, key):
try:
index = dicName[key]
except:
index = -1
return index
def feedQueue(qItem, theVerbose = 0):
if isDp: # DP model
currQTime = qItem[CH_END] * binWnd
else: # NSLOC model
spkNum = qItem[SPKSZ - 1]
if spkNum == 0: # timeout item
currQTime = qItem[SPKSZ - 2] * 1000 # sec -> ms
else:
currQTime = qItem[(spkNum - 1) * 3 + 2] * 1000 # sec -> ms
queue.put(qItem)
while 1:
qsize = queue.qsize()
if qsize <= 1:
break
with lock:
currSimTime = currNeuronTime.value
if verbose:
print "[feedQueue] currQTime:", currQTime, "currSimTime: ", currSimTime, "LR: ", LR
if (currQTime - currSimTime) > LR: # Simulation is slow
if qsize == queue.qsize(): # callback doesn't get an item in Q so far
queue.get() # discard an item
if verbose:
print "[feedQueue] An item is discared in Q"
else: # simulation is fast
break
#class Event(object):
class Event:
def __init__(self, dir="data"):
self.dir = dir
if isDp == 1: # binned data
self.qItem = [0] * (CH_END + 1)
self.invl = binWnd
self.vec = h.mua # mua : vector in Hoc
else: # spikes
self.qItem = [0] * (SPKSZ + 1) # = 3 * SPKNUM + 1 + Serial Number
self.invl = 0.025
if pc.id() == 0:
self.fih = h.FInitializeHandler(1, self.callback)
else:
self.fih = h.FInitializeHandler(1, self.callbackSlave)
# for timing measurement
self.num = 0
self.write = 1
# for opto
if isCommunication == 0 and pc.id == 0:
if isDp == 0: # read spikes from a file by the server w/o communication
self.fname = self.dir + "/spikePMd-6sec.tsv"
self.fd = open(self.fname, "r")
print "[Open ", self.fname, " for test]"
else: # DP
pass
# callback function for the DP (isDp = 1) and NSLOC (isDp = 0) model
def callbackSlave(self):
try:
if isCommunication == 0: # No queue in the server process
if isDp == 0: # NSLOC and spikes from a file w/o communication
pass
else: # DP
pass
# get an item from the queue. Raise exception when it fails
else: # DP and NSLOC with online and offline mode
try:
nextInvl = self.invl
currSimTime = h.t
newCurrTime.value = currSimTime
if isDp == 1:
self.qItem = queue.get()
else:
n = pc.broadcast(s.vec, 0)
#except Queue.Empty as e: # No item in Q
except: # No item in Q
if verbose:
print "[callbackSlave] No item in Q"
else:
if isDp == 0 and not n == 0: #NSLOC
#self.qItem = vec.to_python()
if timeMeasure and self.write:
deqTimeList.append(time.time() - timer)
self.num += 1
if self.num == iteration:
self.write = 0
if not os.path.exists(self.dir):
os.mkdir(self.dir)
fname = self.dir + "/NslocPullTimefile"
f1 = open(fname, "w+")
for item in deqTimeList:
f1.write("%s\n" % item)
f1.close()
print "=======NslocPulltime printed!!!"
spkNum = int(s.vec[SPKSZ - 1])
# update current neuron time
if spkNum == 0: # timeout item
currSimTime = self.qItem[SPKSZ - 2] * 1000 + h.t # sec -> ms
else:
#currSimTime = self.qItem[(spkNum - 1) * 3 + 2] * 1000 # sec -> ms
currSimTime = s.vec[(spkNum - 1) * 3 + 2] * 1000 # sec -> ms
# [0]:CH_ID, [1]:Unit_ID, [2]: Time_stamp
for i in range(spkNum):
#timeStamp = self.qItem[3 * i + 2] * 1000 # second -> ms
timeStamp = s.vec[3 * i + 2] * 1000 # second -> ms
if h.t <= timeStamp and timeStamp <= duration: # queue spikes in the NEURON queue and ignore old spikes
# Queue netcon event. CH_START = 1, CH_END = 96
#localIndex = checkLocalIndexbyKey(int(self.qItem[3 * i] - 1), innclDic)
localIndex = checkLocalIndexbyKey(s.innclDic, int(vec[3 * i] - 1))
if not localIndex == -1:
#h.s.inncl.o(localIndex).event(timeStamp, 1)
s.inncl[localIndex].event(timeStamp, 1)
if verbose:
print "[CallbackSlave] ChID:", self.qItem[3 * i], "UnitID:", self.qItem[3 * i + 1], "Time:", self.qItem[3 * i + 2] * 1000, "t: ", h.t
#fdtime.write(str(int(self.qItem[3*i])) + '\t' +str(int(self.qItem[3*i+1])) + '\t' +str('{:g}'.format(self.qItem[3*i+2])) + '\n')
else: # spike ignored. For dubug
if verbose:
print "[CallbackSlave] ChID:", self.qItem[3 * i], "UnitID:", self.qItem[3 * i + 1], "Time:", self.qItem[3 * i + 2], "t: ", h.t
#fdtime2.write(str(int(self.qItem[3*i])) + '\t' +str(int(self.qItem[3*i+1])) + '\t' +str('{:g}'.format(self.qItem[3*i+2])) + '\t' + str(h.t/1000) + '\n')
# move t to currSimTime
#if currSimTime > h.t and currSimTime < duration:
if currSimTime > h.t:
if simMode == 1: # online mode
newCurrTime.value = currSimTime
else:
currSimTime = h.t
newCurrTime.value = currSimTime
finally:
if verbose:
print "[CallbackSlave] currNeuronTime: ", currSimTime
except:
if isCommunication == 0:
self.fd.close()
if verbose:
print "[CallbackSlave] exception occurs:", traceback.print_exc()
finally: # update current neuron time
if isCommunication == 0:
pass
else:
if verbose:
print('[callback.h.t End(id= %i)] %.3f' %(pc.id(), h.t))
if newCurrTime.value + nextInvl <= duration:
h.cvode.event(newCurrTime.value + nextInvl, self.callbackSlave)
# callback function for the DP (isDp = 1) and NSLOC (isDp = 0) model
def callback(self):
try:
if isCommunication == 0: # No queue in the server process
if isDp == 0: # NSLOC and spikes from a file w/o communication
#for locals in self.fd:
for locals in self.fd:
localts = locals.split()
if localts[0] == '1': # spikes
if unsortedSpkFilter and localts[2] == '0': # filter unsorted spikes
continue
else:
tt = float(localts[3]) * 1000
if tt > h.t:
s.inncl[int(localts[1]) - 1].event(tt, 1)
if verbose:
fdtime.write(str(1) + '\t' + str(int(localts[1])) + '\t' +str(int(localts[2])) + '\t' +str('{:g}'.format(float(localts[3]))) + '\n')
else: # DP
pass
# get an item from the queue. Raise exception when it fails
else: # DP and NSLOC with online and offline mode
try:
nextInvl = self.invl
currSimTime = h.t
newCurrTime.value = currSimTime
if timeMeasure:
timer = time.time()
if isDp == 1:
self.qItem = queue.get()
else:
self.qItem = queue.get(False) # Q has an item?
n = pc.broadcast(s.vec.from_python(self.qItem), 0) # convert python list to hoc vector
except Queue.Empty as e: # No item in Q
n = pc.broadcast(s.emptyVec, 0)
if verbose:
print "[callback] No item in Q"
else:
if isDp == 1:
if timeMeasure and self.write:
deqTimeList.append(time.time() - timer)
self.num += 1
if self.num == iteration:
self.write = 0
if not os.path.exists(self.dir):
os.mkdir(self.dir)
fname = self.dir + "/DpPullTimefile"
f1 = open(fname, "w+")
for item in deqTimeList:
f1.write("%s\n" % item)
f1.close()
print "=======DpPull time printed!!!"
# for debug
#if 0 and row > 0:#verbose:
# f_handle = file('spk_rcv.txt', 'a')
# for item in self.qItem:
# f_handle.write("%d " % item)
# f_handle.write("\n")
# f_handle.close()
currSimTime = self.qItem[CH_END] * binWnd
if self.qItem[0] == -1: # timeout message in Q
print "[callback-DP] timeout message"
h.setUpdateDPzero() #h.updateDpWithZero()
else: # data in a queue item in Q
# Convert the python list to the hoc vector (h.mua)
self.vec = self.vec.from_python(self.qItem)
h.updateDpWithMua()
else: #NSLOC
if timeMeasure and self.write:
deqTimeList.append(time.time() - timer)
self.num += 1
if self.num == iteration:
self.write = 0
if not os.path.exists(self.dir):
os.mkdir(self.dir)
fname = self.dir + "/NslocPullTimefile"
f1 = open(fname, "w+")
for item in deqTimeList:
f1.write("%s\n" % item)
f1.close()
print "=======NslocPulltime printed!!!"
spkNum = self.qItem[SPKSZ - 1]
# update current neuron time
if spkNum == 0: # timeout item
currSimTime = self.qItem[SPKSZ - 2] * 1000 + h.t # sec -> ms
else:
currSimTime = self.qItem[(spkNum - 1) * 3 + 2] * 1000 # sec -> ms
# [0]:CH_ID, [1]:Unit_ID, [2]: Time_stamp
for i in range(spkNum):
timeStamp = self.qItem[3 * i + 2] * 1000 # second -> ms
if h.t <= timeStamp and timeStamp <= duration: # queue spikes in the NEURON queue and ignore old spikes
# Queue netcon event. CH_START = 1, CH_END = 96
localIndex = checkLocalIndexbyKey(s.innclDic, int(self.qItem[3 * i] - 1))
if not localIndex == -1:
s.inncl[localIndex].event(timeStamp, 1)
if verbose:
print "[Callback] ChID:", self.qItem[3 * i], "UnitID:", self.qItem[3 * i + 1], "Time:", self.qItem[3 * i + 2] * 1000, "t: ", h.t
#fdtime.write(str(int(self.qItem[3*i])) + '\t' +str(int(self.qItem[3*i+1])) + '\t' +str('{:g}'.format(self.qItem[3*i+2])) + '\n')
else: # spike ignored. For dubug
if verbose:
print "[Callback] ChID:", self.qItem[3 * i], "UnitID:", self.qItem[3 * i + 1], "Time:", self.qItem[3 * i + 2], "t: ", h.t
#fdtime.write(str(int(self.qItem[3*i])) + '\t' +str(int(self.qItem[3*i+1])) + '\t' +str('{:g}'.format(self.qItem[3*i+2])) + '\t' + str('{:g}'.format(h.t/1000)) + '\n')
# move t to currSimTime
if currSimTime > h.t:# and currSimTime < duration:
if simMode == 1: # online mode
newCurrTime.value = currSimTime
else:
currSimTime = h.t
newCurrTime.value = currSimTime
finally:
with lock:
currNeuronTime.value = currSimTime # (ms)
if verbose:
print "[Callback] currNeuronTime: ", currSimTime, nextInvl
except:
if isCommunication == 0:
self.fd.close()
if verbose:
print "[Callback] exception occurs:", traceback.print_exc()
finally: # update current neuron time
if isCommunication == 0:
pass
else:
if newCurrTime.value + nextInvl <= duration:
h.cvode.event(newCurrTime.value + nextInvl, self.callback)
if verbose:
print('[callback.h.t End(id= %i)] %.3f' %(pc.id(), h.t))
fdtime2.write('Invl: ' + str(nextInvl) + '\t' + 'h.t + Invl: ' + str(h.t + nextInvl) + '\n')
fdtime3.write('invl, h.t + nextInvl: ' + str(nextInvl) + ' ' + str(h.t + nextInvl) + '\n')
# Connect python server to plexon client.
# It waits for receiving an "ALIVE" message from the client.
# It conducts a shake-handing process to exchange parameters.
def nrn_py_connectPlxClient():
try:
# Set the socket parameter for initial handshaking with client
host = "" # all available interfaces
buf = 4096
addr = (host, initPort)
Sock = socket(AF_INET, SOCK_DGRAM)
Sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
Sock.bind(addr)
# receive ALIVE from the client
data, addr = Sock.recvfrom(buf)
print data
dataLen = len(data)
dlen = dataLen / 2
if dataLen == 2:
cdata = struct.unpack('H' * dlen, data)
if cdata[0] == ALIVE:
if verbose:
print "I received ALIVE"
# send INITDATA to the client
a = array.array('H')
a.extend([INITDATA, comPort, isUdp, isDp, isLwc, binWnd, timeOut, unsortedSpkFilter, syncSpkFilter, verbose])
Sock.sendto(a.tostring(), addr)
if verbose:
print "I sent INITDATA"
data, addr = Sock.recvfrom(buf)
dataLen = len(data)
dlen = dataLen / 2
cdata = struct.unpack('H' * dlen, data)
if cdata[0] == ACK:
print "Connection success!!!"
else:
print "ERROR: Connection failure!!!"
raise
Sock.close()
# Create socket and bind to address for spike exchange
addr = (host, comPort)
if isUdp:
Sock = socket(AF_INET, SOCK_DGRAM)
Sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
Sock.bind(addr)
return (Sock, 0, 0)
else:
Sock = socket(AF_INET, SOCK_STREAM)
Sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
Sock.bind(addr)
Sock.listen(1)
conn, addr = Sock.accept()
return (Sock, conn, addr)
except:
print "[nrn_py_connectPlxClient] exception occurs:", sys.exc_info()[0]
sys.exit(0)
# Clarify spikes and external events.
# length(cdata) = drows * 4 fields : |data type|ch#|unit#|TS|
# this function returns two arrays for spikes and external events
def nrn_py_clarifyEvt(cdataNp):
try:
spkArr = np.zeros((0, 4))
evtArr = np.zeros((0, 4))
row, col = cdataNp.shape
if row > 0:
spkArr = cdataNp[cdataNp[:, 0] == 1] # spikes
evtArr = cdataNp[cdataNp[:, 0] == 4] # external events
except:
print "[nrn_py_clarifyEvt] exception occurs:", sys.exc_info()[0]
finally:
return (spkArr, evtArr)
# filter system events
def nrn_py_filterSysEvt(evtArr):
try:
newEvtArr = np.zeros((0, 4))
row, col = evtArr.shape
if row > 0:
newEvtArr = evtArr[evtArr[:,1] == 257]
if verbose:
print "nrn_py_filterSysEvt returns"
except:
print "[nrn_py_filterSysEvt] exception occurs:", sys.exc_info()[0]
finally:
return newEvtArr
# reorder spikes
# orderedArr = nrn_py_reorder(spkArr)
def nrn_py_reorder(spkArr):
if not hasattr(nrn_py_reorder, "keptArr"):
nrn_py_reorder.keptArr = np.zeros((0, 4)) # this is a static numpy array
try:
row, col = nrn_py_reorder.keptArr.shape
orderedArr = np.zeros((0, 4))
if row > 0:
maxTS = nrn_py_reorder.keptArr[row - 1][3]
else:
maxTS = 0
#concatenates
newArr = np.concatenate((nrn_py_reorder.keptArr, spkArr), axis = 0)
row, col = newArr.shape
if row > 0:
# reorder by timestamps
newArr = newArr[newArr[:, 3].argsort()]
nrn_py_reorder.keptArr = newArr[newArr[:,3] > maxTS]
orderedArr = newArr[newArr[:, 3] <= maxTS]
except:
print "[nrn_py_reorder] exception occurs:", sys.exc_info()[0]
finally:
return orderedArr
# Filter sync spikes
# syncSpkFilter = 1: filter spikes having same timestamp
# syncSpkFilter = 2: filter spikes in a sync window [syncWnd * x, syncWnd * (x + 1))
def nrn_py_filterSyncSpk(spkArr):
if not hasattr(nrn_py_filterSyncSpk, "FRem"):
nrn_py_filterSyncSpk.FRem = np.zeros((0, 4))
if not hasattr(nrn_py_filterSyncSpk, "jValue"):
nrn_py_filterSyncSpk.jValue = 0
try:
newFltdArr = np.zeros((0, 4))
if syncSpkFilter == 1:
row, col = spkArr.shape
if row > 0:
if row == 1: # one element in spkArr
newFltdArr = spkArr
else: # more than one in spkArr
sync = 0
baseValue = np.array(spkArr[0], ndmin = 2) # first elem in spkArr
for ii in range(1, row): # 1, 2, 3, ..., m-1
if baseValue[0][3] != spkArr[ii][3]:
if sync == 0:
newFltdArr = np.concatenate((newFltdArr, baseValue), axis = 0)
else:
sync = 0
else:
sync = 1
baseValue = np.array(spkArr[ii], ndmin = 2)
if sync == 0:
newFltdArr = np.concatenate((newFltdArr, baseValue), axis = 0)
else:
# filtering spikes in a same sync window greater than 0.00025 (ms)
if syncSpkFilter == 2:
newArr = np.concatenate((nrn_py_filterSyncSpk.FRem, spkArr), axis = 0)
row, col = newArr.shape
if row > 0:
# sync window size is syncWnd
# find the biggest "j"
maxTS = newArr[row - 1][3]
newJ = nrn_py_filterSyncSpk.jValue + 1
while 1:
if syncWnd * (newJ - 1) <= maxTS and maxTS < (syncWnd * newJ):
break
else:
newJ += 1
nrn_py_filterSyncSpk.FRem = newArr[newArr[:, 3] >= syncWnd * (newJ - 1)]
for ii in range(nrn_py_filterSyncSpk.jValue, newJ - 1):
newArr = newArr[newArr[:, 3] >= syncWnd * ii]
temp = newArr[newArr[:, 3] < syncWnd * (ii + 1)]
row, col = temp.shape
if row == 1:
newFltdArr = np.concatenate((newFltdArr, temp), axis = 0)
nrn_py_filterSyncSpk.jValue = newJ - 1
else:
print "Wrong filtering options"
except:
print "[nrn_py_filterSyncSpk] exception occurs:", sys.exc_info()[0]
finally:
return newFltdArr
def nrn_py_filterUnsortedSpk(spkArr):
try:
newFltdArr = np.zeros((0, 4))
row, col = spkArr.shape
if row > 0:
newFltdArr = spkArr[spkArr[:, 2] > 0]
except:
print "[nrn_py_filterUnsortedSpk] exception occurs:", sys.exc_info()[0]
finally:
return newFltdArr
##
# mua, binningComplete = nrn_py_binSpk(spkArr, binStart, timeoutFlag)
# Here mua is a numpy array, so it should be converted to python list.
def nrn_py_binSpk(spkArr, binStart, timeoutFlag):
try:
# MUA in 1st, and 2nd time interval
if not hasattr(nrn_py_binSpk, "binnedMsg"):
nrn_py_binSpk.binnedMsg = np.zeros((CH_END + 1, 1)) # 96 channel + binWnd
if not hasattr(nrn_py_binSpk, "dataHave"):
nrn_py_binSpk.dataHave = 0
if not hasattr(nrn_py_binSpk, "BRem"):
nrn_py_binSpk.BRem = np.zeros((0, 4)) # spikes
#if unsortedSpkFilter == 0:
# UNIT_START = 0
#else:
# UNIT_START = 1
newBinnedMsg = np.zeros((CH_END + 1, 1))
binningComplete = 0
binEnd = (binStart + binWnd)/1000.0
binStart = binStart/1000.0
# concatenate
newArr = np.concatenate((nrn_py_binSpk.BRem, spkArr), axis = 0)
nrn_py_binSpk.BRem = np.zeros((0, 4))
row, col = newArr.shape
if row > 0:
nrn_py_binSpk.BRem = newArr[newArr[:, 3] >= binEnd]
newArr = newArr[newArr[:, 3]< binEnd]
row, col = newArr.shape
if row > 0:
for j in range(row):
channelID = newArr[j][1]
unitID = newArr[j][2]
timeStamp = newArr[j][3] #(ms)
#Timestamp is in the current time interval
if binStart <= timeStamp and timeStamp < binEnd:
if CH_START <= channelID and channelID <= CH_END:
if UNIT_START <= unitID and unitID <= UNIT_END:
nrn_py_binSpk.dataHave += 1
# Update spike counts/ch (MUA)
nrn_py_binSpk.binnedMsg[channelID - 1] += 1
row, col = nrn_py_binSpk.BRem.shape
if row > 0 or timeoutFlag == 1:
binningComplete = 1
if nrn_py_binSpk.dataHave:
newBinnedMsg = nrn_py_binSpk.binnedMsg
nrn_py_binSpk.dataHave = 0
nrn_py_binSpk.binnedMsg = np.zeros((CH_END + 1, 1))
except:
print "[nrn_py_binSpk] exception occurs:", sys.exc_info()[0]
finally:
return (newBinnedMsg, binningComplete)
##
# The LWC server with the Lightweight client which sends raw data to the server
# port: port number for UDP server
# binWnd: choose proper time window in ms. Ex) 100, 200, ...
# verbose: 1 prints information useful for debugging
# Received data: <<data type|channel#|unit#|timestamps in seconds>>
def nrn_py_interfaceDpLwc(dir):
print "nrn_py_interfaceDpLwc running..."
# MUA per channel in a binWnd: [spikes in ch0, spikes in ch1, ..., spikes in ch96, timeInterval]
mua = [0] * (CH_END + 1)
buf = 4096
if verbose:
# Remove previous data files
if isUdp:
fname = "Udp"
else:
fname = "Tcp"
for fileRemove in glob('*LWC*.tsv'):
os.unlink(fileRemove)
f1 = open(fname + "LWCTS.tsv", "w+")
f1.write('Type\tCH#\tUnit#\tTS\n')
f2 = open(fname + "LWC2.tsv", "w+")
f2.write('IntervalEnd\tSpikes in all CHs\tTotal spikes\n')
f3 = open(fname + "LWCMua.tsv", "w+")
f3.write('IntervalEnd\tMUA/Ch\n')
f4 = open(fname + "LWCSua.tsv", "w+")
f4.write('IntervalEnd\tSUA/Ch\n')
totalSpikeCnt = 0
validSpikeCnt = 0
spikePerTS = 0 # spike counts per timestamp
totalRcvBytes = 0
a = time.time()
# connect to Plexon client
Sock, conn, addr = nrn_py_connectPlxClient()
SN = 1 # serial number
lastEndTS = 0.0 # second
binStart = 0
if timeMeasure:
mAddr0 = (ackServerIP, measurePort0)
mAddr1 = (ackServerIP, measurePort1)
mAddr2 = (ackServerIP, measurePort2)
mAddr3 = (ackServerIP, measurePort3)
totalMUA = 0
mSock = socket(AF_INET, SOCK_DGRAM)
mSock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
mA = array.array('H')
mA.extend([0])
fTimeNum = 1
bTimeList = []
# Rcv messages from the client
while 1:
if verbose:
getTime1 = time.time()
if isUdp:
data, addr = Sock.recvfrom(buf)
else:
data = conn.recv(buf)
if verbose:
f3.write("gTime1: " + str(time.time() - getTime1) + "\t")
dataHave = 0
dataLen = len(data)
if data == 'exit':
print "Client has exited!"
if verbose:
print "Total spikes: ", totalSpikeCnt, "Valid spikes: ", validSpikeCnt
f1.write('Total spikes: ' + str(totalSpikeCnt) + '\tValid spikes: ' + str(validSpikeCnt) + '\n')
f2.write('Valid spikes: ' + str(validSpikeCnt) + '\n')
f1.close()
f2.close()
# close socket & generating exit and queue it
Sock.close()
sys.exit(0)
# The client sends data (>4) or a "NODATA" message when timeout occurs (==4)
elif dataLen >= 4:
if verbose:
getTime2 = time.time()
timeoutFlag = 0
if dataLen == 4:
cdataNp = np.zeros((0, 4))
timeoutFlag = 1
if verbose:
print "No data"
else:
if not isUdp:
while dataLen % 32 != 0: # For avoiding errors because of MSS in TCP
data += conn.recv(buf)
dataLen = len(data)
dlen = dataLen / 8 # double : 8 bytes
drows = dlen / 4 # 4 fields : |data type|ch#|unit#|TS|
cdata = struct.unpack('d' * dlen, data)
cdataNp = np.asarray(cdata).reshape(drows, 4) # convert tuple to 2D numpy array
if timeMeasure:
mA[0] = O_RCV
mSock.sendto(mA.tostring(), mAddr3)
#### for filtering
mA[0] = O_SND
mSock.sendto(mA.tostring(), mAddr1)
####
# filtering
if syncSpkFilter > 0 and isLwc == 1:
cdataNp = nrn_py_filterSyncSpk(cdataNp)
if unsortedSpkFilter == 1 and isLwc == 1:
cdataNp = nrn_py_filterUnsortedSpk(cdataNp);
if timeMeasure:
#### for filtering
mA[0] = O_RCV
mSock.sendto(mA.tostring(), mAddr1)
####
# binning (Lwc)
if isDp == 1 and isLwc == 1: # binning in the DP model with the HWC mode
if timeMeasure:
mA[0] = O_SND
mSock.sendto(mA.tostring(), mAddr2)
mua, binningComplete = nrn_py_binSpk(cdataNp, binStart, timeoutFlag)
if binningComplete == 1:
row, col = mua.shape
mua = mua[:, 0].tolist() # [[],[],..,[]] -> [, , ,..., ]
if row == 0:
mua = [-1] * (CH_END + 1) # for no mua data
mua[CH_END] = (binStart + binWnd) / binWnd # binning window
binStart = binStart + binWnd;
if verbose:
print "Client sends a NODATA message!"
timeStamp = cdata[1]
totalRcvBytes += NODATA_SIZE
f1.write(str(timeStamp) + '\n')
f2.write(str(timeStamp) + '\t' + str(spikePerTS) + '\t' + str(totalSpikeCnt) + '\t' + str(NODATA_SIZE) + '\n')
qtime = time.time()
if timeMeasure:
mA[0] = O_RCV
mSock.sendto(mA.tostring(), mAddr2)
#mA[0] = O_END
#mSock.sendto(mA.tostring(), mAddr)
timer = time.time()
feedQueue(mua, verbose) # add 0 + timeStamp to the queue
if timeMeasure: # timing measure
totalMUA +=1
if totalMUA <= iteration:
qTimeList.append(time.time() - timer)
if totalMUA == iteration + 1:
if not os.path.exists(dir):
os.mkdir(dir)
fname = dir + "/LwcDpPushTimeFile"
f1 = open(fname, "w+")
for item in qTimeList:
f1.write("%s\n" % item)
f1.close()
print "====LwcDp push"
if verbose:
a = time.time() - qtime
if 0:
print "No data"
######################################################################
# For DP-HWC mode
# The HWC server with the Heavyweight client which sends processed data to the server
# Received data: <<MUAofCH1|SUA0ofCH1|SUA1ofCH1|SUA2ofCH1|SUA3ofCH1|SUA4ofCH1|...|Timestamps/TIME_WND>>
def nrn_py_interfaceDpHwc(dir):
print "nrn_py_interfaceDpHwc running..."
# MUA per channel in a binWnd: [spikes in ch0, spikes in ch1, ..., spikes in ch96, timeInterval]
mua = [0] * (CH_END + 1)
buf = 4096
if verbose:
# Remove previous data files
if isUdp:
fname = "Udp"
else:
fname = "Tcp"
for fileRemove in glob('*HWC*.tsv'):
os.unlink(fileRemove)
f1 = open(fname + "HWCMua.tsv", "w+")
f1.write('IntervalEnd\tMUA/ch\n')
f2 = open(fname + "HWC2.tsv", "w+")
f2.write('IntervalEnd\tSpikes in all CHs\tTotal spikes\n')
f3 = open(fname + "HWCSua.tsv", "w+")
f3.write('IntervalEnd\tSUA/Ch\n')
totalSpikeCnt = 0
spikePerTS = 0 # spike counts per timestamp
totalRcvBytes = 0
validSpikeCnt = 0
a = time.time()
# connect to Plexon client
Sock, conn, addr = nrn_py_connectPlxClient()
SN = 1 # serial number
lastEndTS = 0.0 # second
# Rcv messages from the client
if timeMeasure:
totalMUA = 0
mAddr0 = (ackServerIP, measurePort0)
mAddr1 = (ackServerIP, measurePort1)
mAddr2 = (ackServerIP, measurePort2)
mAddr3 = (ackServerIP, measurePort3)
mSock = socket(AF_INET, SOCK_DGRAM)
mSock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
mA = array.array('H')
mA.extend([0])
number = 1
while 1:
if verbose:
getTime1 = time.time()
if isUdp:
data, addr = Sock.recvfrom(buf)
else:
data = conn.recv(buf)
if verbose:
f3.write("gTime1: " + str(time.time() - getTime1) + "\t")
dataHave = 0
dataLen = len(data)
if data == 'exit':
print "Client has exited!"
if verbose:
print "Total spikes: ", totalSpikeCnt, "Valid spikes: ", validSpikeCnt
f1.write('Total spikes: ' + str(totalSpikeCnt) + '\tValid spikes: ' + str(validSpikeCnt) + '\n')
f2.write('Valid spikes: ' + str(validSpikeCnt) + '\n')
f1.close()
f2.close()
# close socket & generating exit and queue it
Sock.close()
sys.exit(0)
# The client sends data (>4) or a "NODATA" message when timeout occurs (==4)
elif dataLen >= 4:
if verbose:
getTime2 = time.time()
if dataLen == 4:
#'H': unsigned short (2bytes)
cdata = struct.unpack('H' * 2, data)
mua = [-1] * (CH_END + 1)
mua[CH_END] = cdata[1] # binning window
if verbose:
print "Client sends a NODATA message!"
timeStamp = cdata[1]
totalRcvBytes += NODATA_SIZE
f1.write(str(timeStamp) + '\n')
f2.write(str(timeStamp) + '\t' + str(spikePerTS) + '\t' + str(totalSpikeCnt) + '\t' + str(NODATA_SIZE) + '\n')
qtime = time.time()
feedQueue(mua, verbose) # add 0 + timeStamp to the queue
if verbose:
a = time.time() - qtime
if 0:
print "No data"
else:
if not isUdp:
# DATA_SIZE = (CH_END + 1) * 8 # 1 for binning window. 8 is double in Matlab
while dataLen < DATA_SIZE: # For avoiding errors because of MSS in TCP
data += conn.recv(buf)
dataLen = len(data)
dlen = dataLen / 8 # double : 8 bytes # [ch1|ch2|...|ch96|binning window]
mua = struct.unpack('d' * dlen, data)
if timeMeasure:
mA[0] = O_RCV
mSock.sendto(mA.tostring(), mAddr3)
timer = time.time()
feedQueue(mua, verbose)
if timeMeasure: # timing measure
totalMUA +=1
if totalMUA <= iteration:
qTimeList.append(time.time() - timer)
if totalMUA == iteration + 1:
if not os.path.exists(dir):
os.mkdir(dir)
fname = dir + "/HwcDpPushTimeFile"
f1 = open(fname, "w+")
for item in qTimeList:
f1.write("%s\n" % item)
f1.close()
print "==========HwcDpPushTimeFile"
if verbose:
totalRcvBytes += DATA_SIZE
# for verification between client and server
timeStamp = cdata[dataOffset + 1 + CH_END * 6] * binWnd
f3.write(str(timeStamp))
# print MUA/ch only
f1.write(str(timeStamp))
for i in range(CH_END):
spikePerTS += mua[i]
f1.write('\t' + str(mua[i])) # 6=> ch:unit0:unit1:unit2:unit3:unit4
for j in range(UNIT_END + 1):
f3.write('\t' + str(cdata[dataOffset + 1 + i * 6 + 1 + j]))
f1.write('\n')
f3.write('\n')
totalSpikeCnt += spikePerTS
f2.write(str(timeStamp) + '\t' + str(spikePerTS) + '\t' + str(totalSpikeCnt) + '\t' + str(DATA_SIZE) + '\n')
spikePerTS = 0
#########################################################################################
# The server with the Lightweight client which sends raw data to the server