1#!/usr/local/bin/python3.8 2 3# python3 status: started 4 5# Timing library, accessed via timing_tool.py. 6# 7# - offers class AfniTiming 8# 9# R Reynolds December, 2008 10 11import sys 12from afnipy import module_test_lib 13g_testlibs = ['sys', 'math', 'copy'] 14if module_test_lib.num_import_failures(g_testlibs): sys.exit(1) 15 16# import libraries 17import math 18import copy 19from afnipy import afni_util as UTIL 20from afnipy import lib_afni1D as LD 21 22g_marry_AM_methods = ['lin_run_fraq', 'lin_event_index'] 23g_tsv_def_labels = ['onset', 'duration', 'trial_type'] 24 25# ---------------------------------------------------------------------- 26# AfniTiming class - for stim times that are married with 27# either time intervals or magnitudes 28 29class AfniTiming(LD.AfniData): 30 def __init__(self, filename="", dur=-1, mdata=None, fsl_flist=[], verb=1): 31 """AFNI married stimulus timing class""" 32 33 super(AfniTiming, self).__init__(filename, mdata=mdata, 34 fsl_flist=fsl_flist, verb=verb) 35 36 # initialize duration data 37 if self.ready: self.init_durations(dur) 38 39 if self.verb > 3: self.show() 40 41 # old accessor functions from AfniTiming 42 def add_rows(self, newdata): 43 """add newdata.nrows of data (additional runs), return 0 on success""" 44 if not self.ready or not newdata.ready: 45 print('** AMTiming elements not ready for adding rows (%d,%d)' % \ 46 (self.ready, newdata.ready)) 47 return 1 48 49 if self.verb > 1: print('++ Timing: adding %d rows' % newdata.nrows) 50 51 if self.mtype != newdata.mtype: 52 print('** add rows: mis-match mtypes (%d vs. %d)' \ 53 % (self.mtype, newdata.mtype)) 54 55 # get data and mdata 56 edata = copy.deepcopy(newdata.data) 57 self.data.extend(edata) 58 59 edata = copy.deepcopy(newdata.mdata) 60 self.mdata.extend(edata) 61 62 self.nrows += newdata.nrows 63 64 if self.row_lens and newdata.row_lens: 65 self.row_lens.extend(newdata.row_lens) 66 67 return 0 68 69 def init_durations(self, dur=-1): 70 """initialize ddata as format [start_time, end_time] 71 """ 72 73 # start with general format 74 if not self.ready: return 75 76 # if dur is passed, first apply it 77 if dur >= 0: 78 for row in self.mdata: 79 for entry in row: entry[2] = dur 80 81 self.ddata = [] 82 for row in self.mdata: 83 self.ddata.append([[val[0],val[0]+val[2]] for val in row]) 84 85 if dur > 0: self.mtype |= LD.MTYPE_DUR # set bit 86 elif dur == 0: self.mtype &= LD.MTYPE_AMP # only allow AMP to go through 87 # else leave as is 88 89 self.dur_len = dur 90 91 def set_dur_len(self): 92 self.dur_len = self.get_duration() 93 94 def extend_rows(self, newdata): 95 """extend each row by the corresponding row of brows""" 96 97 if self.extend_data_rows(newdata): return 1 98 99 # if interval lengths are not equal, must clear it 100 if self.dur_len != newdata.dur_len: self.dur_len = -1 101 102 return 0 103 104 def get_start_end_timing(self, sort=0): 105 """return a 2D array of [start, finish] times (so 3D object)""" 106 sftimes = [] 107 for row in self.mdata: 108 # if married, include modulators 109 if self.married: 110 times = [[e[0],e[0]+e[2], e[1]] for e in row] 111 else: 112 times = [[e[0],e[0]+e[2]] for e in row] 113 if sort: times.sort() 114 sftimes.append(times) 115 return sftimes 116 117 def partition(self, part_file, prefix): 118 """partition timing based on part_file labels, write each part to 119 prefix_label.1D""" 120 121 if not self.ready: 122 print('** Timing element not ready for partitioning') 123 return 1 124 125 labels = read_value_file(part_file) 126 if labels == None: 127 print("** failed to read partition label file '%s'" % part_file) 128 return 1 129 130 nlabr = len(labels) 131 132 if self.verb > 3: print('-- partition: %d labels: %s' % (nlabr, labels)) 133 134 # first test nrows, then test lengths per row 135 if self.nrows != nlabr: 136 print('** Timing nrows differs from partition nrows (%d, %d)' % \ 137 (self.nrows,nlabr)) 138 return 1 139 for ind in range(self.nrows): 140 if len(self.data[ind]) != len(labels[ind]): 141 print("** timing and label row lengths differ at line %d"%(ind+1)) 142 print(" (%d != %d)" % (len(self.data[ind]), len(labels[ind]))) 143 return 1 144 145 # make unique label list 146 ulabs = [] 147 for line in labels: 148 ulabs.extend(line) 149 ulabs = UTIL.get_unique_sublist(ulabs) 150 if self.verb > 2: print('-- partition: unique label list: %s' % ulabs) 151 if ulabs.count('0'): ulabs.remove('0') 152 153 if self.verb > 1: print('++ Timing: partitioning with %s' % part_file) 154 155 # ------------------------------------------------------------ 156 # do the work, starting with copy: 157 # for each label, extract those times and write out as timing file 158 dupe = self.copy() # keep results in new class instance 159 for lab in ulabs: 160 # extract timing for this label 'lab' 161 mdata = [] 162 for r in range(nlabr): 163 drow = [] # make one row of times for this label 164 for c in range(len(labels[r])): 165 if labels[r][c] == lab: drow.append(self.mdata[r][c]) 166 mdata.append(drow) # and append the new row 167 del(dupe.mdata) # out with the old, 168 dupe.mdata = mdata # and in with the new 169 dupe.write_times('%s_%s.1D' % (prefix,lab)) # and write, yay 170 171 del(dupe) # nuke the temporary instance 172 173 return 0 174 175 def global_to_local(self, run_len): 176 """convert global times to local, based in run_len array 177 return 0 on success, 1 on any error""" 178 179 if not self.ready: 180 print('** global timing not ready') 181 return 1 182 183 if len(run_len) == 0: 184 print('** global_to_local requires -run_len') 185 return 1 186 187 if not self.is_rect(): 188 print('** global timing is not rectangular') 189 return 1 190 191 rlen = len(self.mdata[0]) # note row length 192 193 if rlen > 1: 194 print('** global timing is not a single column') 195 return 1 196 197 if rlen < 1: return 0 # nothing to do 198 if self.nrows < 2: return 0 # nothing to do 199 200 # make just one row and sort 201 self.transpose() 202 self.sort() 203 204 if self.verb > 2: 205 self.show('global_to_local') 206 print('-- run lengths : %s' % run_len) 207 208 if self.verb > 4: 209 print('-- global start time matrix %s' % self.data) 210 211 # now walk through runs and insert times as we go 212 newdata = [] 213 newmdata = [] 214 endtime = 0.0 215 sind = 0 216 stimes = self.mdata[0] 217 ntimes = len(stimes) 218 for etime in run_len: 219 starttime = endtime 220 endtime += etime 221 if sind >= ntimes: # only empty runs left 222 newdata.append([]) 223 newmdata.append([]) 224 continue 225 # times are left, decide which go for this run 226 last = sind 227 while last < ntimes and stimes[last][0] < endtime: last += 1 228 mtimes = stimes[sind:last] 229 for tentry in mtimes: tentry[0] -= starttime 230 newdata.append([t[0] for t in mtimes]) 231 newmdata.append(mtimes) 232 sind = last 233 234 # insert any remaining times at end of last run(and warn user) 235 if sind < ntimes: 236 if self.verb > 0: 237 print('** global to local: %d times after last run' % (ntimes-sind)) 238 mtimes = stimes[sind:] 239 for tentry in mtimes: tentry[0] -= starttime 240 newdata[-1].extend([t[0] for t in mtimes]) 241 newmdata[-1].extend(mtimes) 242 243 del(self.data) 244 del(self.mdata) 245 self.data = newdata 246 self.mdata = newmdata 247 self.nrows = len(self.mdata) 248 249 if self.verb > 4: 250 print('-- local time matrix %s' % newdata) 251 252 return 0 253 254 def local_to_global(self, run_len): 255 """convert local times to global times, based in run_len array 256 return 0 on success, 1 on any error""" 257 258 if not self.ready: 259 print('** local timing not ready') 260 return 1 261 262 if len(run_len) != self.nrows: 263 print('** local_to_global: have %d run times but %d data rows' \ 264 % (len(run_len), self.nrows)) 265 return 1 266 267 # make sure it is sorted for this 268 self.sort() 269 270 if self.verb > 2: 271 self.show('local_to_global') 272 print('-- run lengths : %s' % run_len) 273 274 if self.verb > 4: 275 print('-- local time matrix %s' % self.data) 276 277 # now walk through runs and insert times as we go 278 newdata = [] 279 newmdata = [] 280 runstart = 0.0 281 for rind, rtime in enumerate(run_len): 282 # each new time is a new row 283 mrow = copy.deepcopy(self.mdata[rind]) 284 for ind, mtime in enumerate(mrow): 285 mtime[0] += runstart 286 newdata.append([mtime[0]]) 287 newmdata.append([mtime]) 288 runstart += rtime # last one is useless 289 290 del(self.data) 291 self.data = newdata 292 del(self.mdata) 293 self.mdata = newmdata 294 self.nrows = len(self.data) 295 296 if self.verb > 4: 297 print('-- global time matrix %s' % newdata) 298 299 return 0 300 301 def add_val(self, val): 302 """add the given value to each element""" 303 if not self.ready: return 1 304 305 if type(val) == str: 306 try: val = float(val) 307 except: 308 print("** invalid value to add to timing: '%s'" % val) 309 return 1 310 311 if self.verb > 1: print('-- Timing: adding %f to data...' % val) 312 313 for row in self.data: 314 for ind in range(len(row)): 315 row[ind] += val 316 317 for row in self.mdata: 318 for ind in range(len(row)): 319 row[ind][0] += val 320 321 return 0 322 323 def shift_to_offset(self, offset=0): 324 """shift all run times to start at the given offset 325 (offset should be <= first values)""" 326 327 if not self.ready: return 1 328 329 if type(offset) == str: 330 try: offset = float(offset) 331 except: 332 print("** invalid offset to add to timing: '%s'" % offset) 333 return 1 334 335 if self.verb > 1: print('-- timing: setting offset to %f ...' % offset) 336 337 # make sure it is sorted for this 338 self.sort() 339 340 del(self.data) 341 self.data = [] 342 343 for rind, row in enumerate(self.mdata): 344 if len(row) < 1: 345 self.data.append([]) 346 continue 347 diff = row[0][0] - offset 348 if diff < 0: 349 print('** offset shift to %f too big for run %d' % (offset, rind)) 350 return 1 351 for ind in range(len(row)): 352 row[ind][0] -= diff 353 self.data.append([e[0] for e in row]) 354 355 return 0 356 357 def scale_val(self, val): 358 """multiply the given value into each element""" 359 if not self.ready: return 1 360 361 if type(val) == type('hi'): 362 try: val = float(val) 363 except: 364 print("** invalid value to scale into timing: '%s'" % val) 365 return 1 366 367 if self.verb > 1: print('-- Timing: scaling data by %f ...' % val) 368 369 for row in self.data: 370 for ind in range(len(row)): 371 row[ind] *= val 372 373 for row in self.mdata: 374 for ind in range(len(row)): 375 row[ind][0] *= val 376 377 return 0 378 379 def round_times(self, tr, round_frac=1.0): 380 """round/truncate times to multiples of the TR 381 382 round_frac : fraction of TR required to "round up" 383 e.g. 0.5 : normal rounding 384 1.0 : never round up, floor() 385 0.0 : always round up, ceil() 386 """ 387 if not self.ready: return 1 388 if tr <= 0: 389 print("** truncate_times: invalid tr %s" % tr) 390 return 1 391 392 # convert to fraction to add before truncation (and be sure of type) 393 try: rf = 1.0 - round_frac 394 except: 395 print("** truncate_times: invalid round_frac '%s'" % round_frac) 396 return 1 397 398 try: tr = float(tr) 399 except: 400 print("** truncate_times: invalid tr '%s'" % tr) 401 return 1 402 403 if rf < 0.0 or rf > 1.0: 404 print("** truncate_times: bad round_frac outside [0,1]: %s" % rf) 405 return 1 406 407 if self.verb > 1: 408 print('-- Timing: round times to multiples of %s (frac = %s)'%(tr,rf)) 409 410 # fr to use for floor and ceil (to assist with fractional TRs) 411 if tr == math.floor(tr): tiny = 0.0 412 else: tiny = 0.0000000001 413 414 # scale mdata and re-create data 415 del(self.data) 416 self.data = [] 417 for row in self.mdata: 418 for ind in range(len(row)): 419 # start with TR index 420 tind = row[ind][0]/tr 421 # note that rf = 0 now means floor and 1 means ceil 422 423 # add/subract a tiny fraction even for truncation 424 if rf == 1.0 : 425 if tind == 0: val = 0.0 # to avoid tiny negatives 426 else: val = math.ceil(tind-tiny) * tr 427 elif rf == 0.0 : val = math.floor(tind+tiny) * tr 428 else : val = math.floor(tind+rf) * tr 429 row[ind][0] = val 430 self.data.append([e[0] for e in row]) 431 432 return 0 433 434 def marry_AM(self, mtype, rlens=[], nplaces=-1): 435 """add modulator of given type 436 437 lin_run_fraq : [0,1] modulator = event time/run dur 438 lin_run_fraq_off : [0,1] modulator = event time from first/run dur 439 lin_event_index : 0, 1, 2, ... = event index 440 441 if rlens is needed should match len(mdata) 442 """ 443 if not self.ready: return 1 444 # g_marry_AM_methods = ['lin_run_fraq', 'lin_event_index'] 445 446 if mtype not in g_marry_AM_methods: 447 print("** marry_AM: invalid mod type '%s'" % mtype) 448 return 1 449 450 if len(rlens) == 1 and len(self.mdata) > 1: 451 rlens = rlens*len(self.mdata) 452 453 # most types need the run lengths 454 if mtype != 'lin_event_index': 455 nruns = len(self.mdata) 456 nlens = len(rlens) 457 if nlens == 0: 458 print('** marry_AM needs run lengths') 459 return 1 460 if nlens != nruns: 461 print('** marry_AM: have %d runs but %d run lengths'%(nruns, nlens)) 462 return 1 463 464 # append the modulator to each event in mdata 465 for rind, mrun in enumerate(self.mdata): 466 if mtype == 'lin_event_index': rlen = 0 467 else: rlen = rlens[rind] 468 469 for ind, event in enumerate(mrun): 470 if mtype == 'lin_event_index': 471 mval = ind+1 472 elif mtype == 'lin_run_fraq': 473 mval = event[0]/float(rlen) 474 if nplaces >= 0: 475 power = 10.0**nplaces 476 mval = int(power*mval)/power 477 event[1].append(mval) 478 479 # and be sure it is married 480 self.married = 1 481 self.mtype |= LD.MTYPE_AMP 482 483 return 0 484 485 def write_times(self, fname='', nplaces=-1, mplaces=-1, force_married=0): 486 """write the current M timing out, with nplaces right of the decimal 487 488 if force_married, force to married timing, if appropriate 489 """ 490 491 # inherited from lib_afni1D 492 simple = 1 493 if force_married: 494 self.write_dm = 1 495 simple = 0 496 self.write_as_timing(fname, nplaces=nplaces, mplaces=mplaces, 497 check_simple=simple) 498 499 return 0 500 501 def timing_to_1D(self, run_len, tr, min_frac, per_run=0, allow_warns=0, 502 write_mods=0): 503 """return a 0/1 array of trs surviving min_frac cutoff 504 505 run_len : list of run durations, in seconds 506 (each must encompass the last stimulus, of course) 507 tr : time resolution of output 1D file 508 min_frac : minimum TR fraction occupied by stimulus required 509 for a set value in output array 510 (must be in [0.0, 1.0]) 511 per_run : if set, result is list of runs, else catenated 512 write_mods: if set, do not write 1.0, but the modulator 513 514 return an error string and the 0/1 array 515 (on success, the error string is '') 516 """ 517 518 if self.verb > 2: 519 print("-- t21D: per_run %s, allow_wars %s, write_mods %s" \ 520 % (per_run, allow_warns, write_mods) ) 521 522 # maybe the user provided only one run length 523 if self.nrows > 0 and len(run_len) == 1: 524 run_len = [run_len[0]] * self.nrows 525 526 errstr, result, modres = self.timing_to_tr_frac(run_len, tr, per_run, 527 allow_warns=allow_warns, write_mods=write_mods) 528 529 if errstr != '' or len(result) < 1: return errstr, result 530 531 if per_run: 532 new_res = [] 533 for rind, row in enumerate(result): 534 thr_res = self.get_threshold_list(row, min_frac) 535 # if write_mods, convert binary to modulator 536 if write_mods: 537 mrow = modres[rind] 538 if len(mrow) == len(thr_res): 539 for ind in range(len(thr_res)): thr_res[ind] *= mrow[ind] 540 new_res.append(thr_res) 541 else: 542 new_res = self.get_threshold_list(result, min_frac) 543 # if write_mods, convert binary to modulator 544 if write_mods and len(modres) == len(new_res): 545 for ind in range(len(new_res)): new_res[ind] *= modres[ind] 546 547 del(result) 548 del(modres) 549 550 return '', new_res 551 552 def get_threshold_list(self, data, min_val): 553 """return a 0/1 copy of 'data', with each value set iff >= min_val""" 554 result = [0 for ind in range(len(data))] 555 for ind, val in enumerate(data): 556 if val >= min_val: result[ind] = 1 557 return result 558 559 def timing_to_tr_frac(self, run_len, tr, per_run=0, allow_warns=0, 560 write_mods=0): 561 """return an array of stim fractions, where is value is the fraction 562 of the current TR occupied by stimulus 563 564 --> if per_run is set, output will be one row per run 565 566 The output will be of length sum(TRs per run) and will be in [0,1.0]. 567 Note that a single stimulus can span multiple TRs. 568 569 ** save this comment for elsewhere 570 ** save min_tr_frac for elsewhere 571 min_tr_frac : minimum fraction of a TR required to set it 572 (must be in (0.0, 1.0]) 573 Note that if they are not TR-locked and min_tr_frac <= 0.5, then 574 one stimulus lasting one TR but occuring on the half TR can result 575 in a pair of consecutive 1s. 576 ** end save 577 578 run_len : list of run durations, in seconds 579 (each must encompass the last TR, of course) 580 tr : time resolution of output 1D file 581 per_run : make a list of results (else all one) 582 allow_warns : make some issues non-fatal 583 write_mods : return mod values along with tr fractions 584 585 On success, the error string should be empty and stim_list should not. 586 587 return error string, stim list, mod list (if write_mods) 588 589 ** testing ** 590 591 import math, copy 592 from afnipy import afni_util as UTIL, lib_timing as LT 593 reload LT 594 t = LT.AfniTiming('ch_fltr.txt') 595 t.timing_to_tr_frac(run_len, 2.5) 596 t.timing_to_1D(run_len, 2.5, 0.3) 597 """ 598 599 if not self.ready: 600 return '** M Timing: nothing to compute ISI stats from', [], [] 601 602 #if not self.mtype & LD.MTYPE_DUR: 603 # return '** M Timing: cannot compute stats without stim duration', [] 604 605 if self.nrows != len(self.data): 606 return '** bad MTiming, nrows=%d, datalen=%d, failing...' % \ 607 (self.nrows, len(self.data)), [], [] 608 609 if self.nrows != len(run_len): 610 return '** run_len list is %d of %d runs in timing_to_1D: %s' \ 611 % (len(run_len), self.nrows, run_len), [], [] 612 613 if tr <= 0.0: 614 return '** timing_to_tr, illegal TR <= 0: %g' % tr, [], [] 615 616 if write_mods and not self.married: 617 print("** to1D: asking for modulatros, but times are not married") 618 write_mods = 0 619 620 # make a sorted copy of format run x stim x [start,end], i.e. is 3-D 621 tdata = self.get_start_end_timing(sort=1) 622 623 if self.verb > 1: 624 if write_mods: mstr = ', will write mods' 625 else: mstr = '' 626 print('-- timing_to_tr_fr, tr = %g, nruns = %d%s' \ 627 % (tr,len(run_len),mstr)) 628 629 # need to check each run for being empty 630 for ind, data in enumerate(tdata): 631 if len(data) < 1: continue 632 if data[-1][1] > run_len[ind] or run_len[ind] < 0: 633 entry = data[-1] 634 if entry[0] > run_len[ind] or run_len[ind] < 0: 635 return '** run %d, stim starts after end of run' % (ind+1),[],[] 636 elif not allow_warns: 637 return '** run %d, stim ends after end of run' % (ind+1), [],[] 638 else: 639 # entry[1] > run_len[ind] 640 print('** WARNING: run %d stim ends after end of run'%(ind+1)) 641 print(' onset %g, offset %g, end of run %g' \ 642 % (entry[0], entry[1], run_len[ind])) 643 print(' --> truncating end time') 644 entry[1] = entry[0] + 0.99*(run_len[ind]-entry[0]) 645 646 result = [] 647 modres = [] # for modulated results 648 # process one run at a time, first converting to TR indices 649 for rind, data in enumerate(tdata): 650 if self.verb > 4: 651 print('\n++ stimulus on/off times, run %d :' % (rind+1)) 652 print(data) 653 654 for tind in range(len(data)): # convert seconds to TRs 655 data[tind][0] = round(data[tind][0]/float(tr),3) 656 data[tind][1] = round(data[tind][1]/float(tr),3) 657 658 if tind > 0 and data[tind][0] < data[tind-1][1]: 659 if allow_warns: 660 print('** run %d, index %d, stimulus overlap with next' \ 661 % (rind, tind)) 662 else: 663 return '** run %d, index %d, stimulus overlap with next' \ 664 % (rind, tind), [], [] 665 666 if self.verb > 4: 667 print('++ stimulus on/off TR times, run %d :' % (rind+1)) 668 print(data) 669 if self.verb > 3: 670 print('++ tr fractions, run %d :' % (rind+1)) 671 print([data[tind][1]-data[tind][0] for tind in range(len(data))]) 672 673 # do the real work, for each stimulus, fill appropriate tr fractions 674 # init frac list with TR timing (and enough TRs for run) 675 num_trs = int(math.ceil(run_len[rind]/float(tr))) 676 if self.verb>2: print('-- TR frac: have %d TRs and %d events over run'\ 677 % (num_trs, len(data))) 678 rdata = [0] * num_trs 679 mdata = [0] * num_trs 680 for sind in range(len(data)): 681 # note the first and last time indices 682 start = data[sind][0] # initial TR (fractional) time 683 end = data[sind][1] # end TR time 684 startp = int(start) # initial TR index 685 endp = int(end) # end TR index 686 687 # and decide on any modval, in case of write_mods 688 modval = 0.0 689 if write_mods: 690 if len(data[sind]) < 3: 691 print("** married but missing mods to write in %d: %s" \ 692 % (sind, data[sind])) 693 write_mods = 0 694 elif len(data[sind][2]) == 0: 695 print("** married but no mods to write in %d: %s" \ 696 % (sind, data[sind])) 697 write_mods = 0 698 else: 699 modval = data[sind][2][0] 700 701 # deal with easy case : quick process of single TR result 702 if endp == startp: 703 rdata[startp] = end-start 704 mdata[startp] = modval 705 continue 706 707 # otherwise, fill startp, intermediate, and endp fractions 708 # (start and end are probably < 1, intermediates are 1) 709 rdata[startp] = round(1-(start-startp),3) 710 for tind in range(startp+1,endp): 711 rdata[tind] = 1.0 712 rdata[endp] = round(end-endp, 3) 713 714 # if mods, just write all startp through endp as modval 715 if write_mods: 716 for tind in range(startp,endp+1): 717 mdata[tind] = modval 718 719 if self.verb > 3: 720 print('\n++ timing_to_tr_fr, result for run %d:' % (rind+1)) 721 print(' '.join(["%g" % r for r in rdata])) 722 if write_mods: 723 print(' '.join(["%g" % r for r in mdata])) 724 725 if per_run: 726 result.append(rdata) 727 modres.append(mdata) 728 else: 729 result.extend(rdata) 730 modres.extend(mdata) 731 732 del(tdata) 733 del(rdata) 734 del(mdata) 735 736 return '', result, modres 737 738 def show_isi_stats(self, mesg='', run_len=[], tr=0, rest_file=''): 739 """display ISI timing statistics 740 741 mesg : display the user message first 742 run_len : can be empty, length 1 or length nrows 743 tr : if > 0: show mean/stdev for stimuli within TRs 744 (so 0 <= mean < tr) 745 rest_file : if set, save all rest durations 746 747 display these statistics: 748 - total time, total stim time, total rest time 749 750 - total time per run 751 - total stim time per run 752 - total rest time per run 753 754 - pre-stim rest per run 755 - post-stim response rest per run (if run_len is given) 756 - total ISI rest per run 757 758 - min/mean/max (stdev) of stim duration 759 - min/mean/max (stdev) of ISI rest 760 """ 761 762 if not self.ready: 763 print('** M Timing: nothing to compute ISI stats from') 764 return 1 765 766 if not (self.mtype & LD.MTYPE_DUR): 767 print('** warning: computing stats without duration') 768 769 if self.nrows != len(self.data): 770 print('** bad MTiming, nrows=%d, datalen=%d, failing...' % \ 771 (self.nrows, len(self.data))) 772 return 1 773 774 # make a sorted copy 775 scopy = self.copy() 776 scopy.sort() 777 778 # make a copy of format run x stim x [start,end], i.e. is 3-D 779 tdata = scopy.get_start_end_timing() 780 781 # make an updated run lengths list 782 if len(run_len) == 0: 783 rlens = [0 for rind in range(self.nrows)] 784 elif len(run_len) == 1: 785 rlens = [run_len[0] for rind in range(self.nrows)] 786 elif len(run_len) == self.nrows: 787 rlens = run_len 788 else: # failure 789 print('** invalid len(run_len)=%d, must be one of 0,1,%d' % \ 790 (len(run_len), self.nrows)) 791 return 1 792 793 if self.verb > 1: 794 print('-- show_isi_stats, run_len = %s, tr = %s, rest_file = %s' \ 795 % (run_len, tr, rest_file)) 796 797 if self.verb > 3: 798 print(scopy.make_data_string(nplaces=1, flag_empty=0, check_simple=0, 799 mesg='scopy data')) 800 801 all_stim = [] # all stimulus durations (one row per run) 802 all_isi = [] # all isi times (one row per run) 803 pre_time = [] # pre-stim, per run 804 post_time = [] # pose-stim, per run 805 run_time = [] # total run time, per run 806 errs = 0 # allow a few errors before failing 807 max_errs = 10 808 for rind in range(self.nrows): 809 run = tdata[rind] 810 rlen = rlens[rind] 811 812 if len(run) == 0: # empty run 813 all_stim.append([]) 814 all_isi.append([]) 815 pre_time.append(rlen) 816 post_time.append(0.0) 817 run_time.append(rlen) 818 continue 819 820 # if no run len specified, use end of last stimulus 821 if rlen == 0.0: rlen = run[-1][1] 822 elif rlen < run[-1][1]: 823 print('** run %d: given length = %s, last stim ends at %s' % \ 824 (rind+1, rlen, run[-1][1])) 825 errs += 1 826 if errs > max_errs: 827 print('** bailing...') 828 return 1 829 830 # pre- and post-stim times are set 831 pre = run[0][0] 832 post = rlen - run[-1][1] 833 834 if pre < 0: 835 print('** ISI error: first stimulus of run %d at negative time %s'%\ 836 (rind+1, run[0][0])) 837 errs += 1 838 if errs > max_errs: 839 print('** bailing...') 840 return 1 841 842 # init accumulation vars 843 stimes = [run[0][1] - run[0][0]] 844 itimes = [] 845 846 # for each following index, update stim and isi times 847 # (check for bad overlap) 848 tot_olap = 0 849 n_olap = 0 850 for sind in range(1, len(run)): 851 stime = run[sind][1]-run[sind][0] 852 itime = run[sind][0]-run[sind-1][1] 853 olap = -itime # for clarity 854 # there may be float/ascii reason for a tiny overlap... 855 if olap > 0.01: 856 if self.verb > 1: 857 print('** ISI error: stimuli overlap at run %d, time %s,' \ 858 ' overlap %s' % (rind+1, run[sind][0], olap)) 859 if olap > 0.0001: 860 # get rid of the overlap (note that itime < 0) 861 n_olap += 1 862 # fix itime and stime 863 itime = 0 864 stime -= olap 865 tot_olap += olap 866 stimes.append(stime) 867 itimes.append(itime) 868 869 if n_olap > 0: 870 print('** run %d, adjusted for %d overlaps in stim, total = %g s' \ 871 % (rind, n_olap, tot_olap)) 872 873 # store results 874 all_stim.append(stimes) 875 all_isi.append(itimes) 876 pre_time.append(pre) 877 post_time.append(post) 878 run_time.append(rlen) 879 880 if errs > 0: return 1 881 882 # tally the results 883 rtot_stim = [] ; rtot_isi = [] ; rtot_rest = [] 884 stim_list = [] ; isi_list = [] ; nstim_list = [] 885 for rind in range(self.nrows): 886 rtot_stim.append(UTIL.loc_sum(all_stim[rind])) 887 rtot_rest.append(pre_time[rind] + UTIL.loc_sum(all_isi[rind]) + 888 post_time[rind]) 889 rtot_isi.append(UTIL.loc_sum(all_isi[rind])) 890 stim_list.extend(all_stim[rind]) 891 isi_list.extend(all_isi[rind]) 892 nstim_list.append(len(all_stim[rind])) 893 894 if mesg: mstr = '(%s) ' % mesg 895 else: mstr = '' 896 print('\nISI statistics %s:\n' % mstr) 897 898 print(' total per run') 899 print(' ------ ------------------------------') 900 print(' total time %6.1f %s' % \ 901 (UTIL.loc_sum(run_time), float_list_string(run_time, ndec=1))) 902 print(' total time: stim %6.1f %s' % \ 903 (UTIL.loc_sum(rtot_stim),float_list_string(rtot_stim,7,ndec=1))) 904 print(' total time: rest %6.1f %s' % \ 905 (UTIL.loc_sum(rtot_rest),float_list_string(rtot_rest,7,ndec=1))) 906 print('') 907 print(' rest: total isi %6.1f %s' % \ 908 (UTIL.loc_sum(rtot_isi), float_list_string(rtot_isi,7,ndec=1))) 909 print(' rest: pre stim %6.1f %s' % \ 910 (UTIL.loc_sum(pre_time), float_list_string(pre_time,7,ndec=1))) 911 print(' rest: post stim %6.1f %s' % \ 912 (UTIL.loc_sum(post_time),float_list_string(post_time,7,ndec=1))) 913 print('') 914 print(' num stimuli %6d %s' % \ 915 (UTIL.loc_sum(nstim_list), float_list_string(nstim_list,7,ndec=0))) 916 print('\n') 917 918 print(' min mean max stdev') 919 print(' ------- ------- ------- -------') 920 921 print(' rest: pre-stim %7.3f %7.3f %7.3f %7.3f' % \ 922 (UTIL.min_mean_max_stdev(pre_time))) 923 print(' rest: post-stim %7.3f %7.3f %7.3f %7.3f' % \ 924 (UTIL.min_mean_max_stdev(post_time))) 925 print('') 926 927 for ind in range(self.nrows): 928 m0, m1, m2, s = UTIL.min_mean_max_stdev(all_isi[ind]) 929 print(' rest: run #%d ISI %7.3f %7.3f %7.3f %7.3f' % \ 930 (ind, m0, m1, m2, s)) 931 932 print('') 933 print(' all runs: ISI %7.3f %7.3f %7.3f %7.3f' % \ 934 (UTIL.min_mean_max_stdev(isi_list))) 935 print(' all runs: stimuli %7.3f %7.3f %7.3f %7.3f' % \ 936 (UTIL.min_mean_max_stdev(stim_list))) 937 print('') 938 939 # and possibly print out offset info 940 if tr > 0: self.show_TR_offset_stats(tr, '') 941 942 # maybe write all rest durations 943 if rest_file: 944 all_rest = copy.deepcopy(all_isi) 945 for run, rest in enumerate(all_rest): 946 rest[:0] = [pre_time[run]] 947 rest.append(post_time[run]) 948 UTIL.write_to_timing_file(all_rest, rest_file) 949 950 # clean up, just to be kind 951 del(all_stim); del(all_isi); del(pre_time); del(post_time); del(run_time) 952 953 del(rtot_stim); del(rtot_isi); del(rtot_rest); del(stim_list) 954 del(isi_list); del(nstim_list) 955 956 del(scopy) 957 del(tdata) 958 959 def get_TR_offset_stats(self, tr): 960 """create a list of TR offsets (per-run and overall) 961 962 tr : must be positive 963 964 return: 8 values in a list: 965 min, mean, maxabs, stdev of absolute and fractional offsets 966 empty list on error 967 """ 968 969 if not self.ready: 970 print('** M Timing: nothing to compute ISI stats from') 971 return [] 972 973 if self.nrows != len(self.data): 974 print('** bad MTiming, nrows=%d, datalen=%d, failing...' % \ 975 (self.nrows, len(self.data))) 976 return [] 977 978 if tr < 0.0: 979 print('** show_TR_offset_stats: invalid TR %s' % tr) 980 return [] 981 982 tr = float(tr) # to be sure 983 984 # make a copy of format run x stim x [start,end], i.e. is 3-D 985 tdata = self.get_start_end_timing() 986 987 offsets = [] # stim offsets within given TRs 988 for rind in range(self.nrows): 989 run = tdata[rind] 990 if len(run) == 0: continue 991 992 roffsets = UTIL.interval_offsets([val[0] for val in run], tr) 993 offsets.extend(roffsets) 994 995 if len(offsets) < 1: return [] 996 997 # get overall stats (absolute and fractional) 998 999 # absolute 1000 m0, m1, m2, s = UTIL.min_mean_max_stdev(offsets) 1001 offmn = m0; offm = m1; offs = s 1002 mn = abs(min(offsets)) 1003 offmax = abs(max(offsets)) 1004 if mn > offmax: offmax = mn 1005 1006 # fractional 1007 for ind, val in enumerate(offsets): 1008 offsets[ind] = val/tr 1009 m0, m1, m2, s = UTIL.min_mean_max_stdev(offsets) 1010 1011 del(offsets) 1012 1013 return [offmn, offm, offmax, offs, m0, m1, offmax/tr, s] 1014 1015 def show_TR_offset_stats(self, tr, mesg='', wlimit=0.4): 1016 """display statistics regarding within-TR offsets of stimuli 1017 1018 tr : show mean/stdev for stimuli within TRs 1019 (so 0 <= mean < tr) 1020 mesg : display the user message in the output 1021 """ 1022 1023 rv, rstr = self.get_TR_offset_stats_str(tr, mesg=mesg, wlimit=wlimit) 1024 print(rstr) 1025 1026 def get_TR_offset_stats_str(self, tr, mesg='', wlimit=0.4): 1027 """return a string to display statistics regarding within-TR 1028 offsets of stimuli 1029 1030 tr : show mean/stdev for stimuli within TRs 1031 (so 0 <= mean < tr) 1032 mesg : display the user message in the output 1033 1034 return status, stats string 1035 1036 status > 0 : success, warnings were issued 1037 = 0 : success, no warnings 1038 < 0 : errors 1039 """ 1040 1041 if not self.ready: 1042 return 1, '** M Timing: nothing to compute ISI stats from' 1043 1044 if self.nrows != len(self.data): 1045 return 1, '** bad MTiming, nrows=%d, datalen=%d, failing...' % \ 1046 (self.nrows, len(self.data)) 1047 1048 if tr < 0.0: 1049 return 1, '** show_TR_offset_stats: invalid TR %s' % tr 1050 1051 off_means = [] # ... means per run 1052 off_stdev = [] # ... stdevs per run 1053 for rind in range(self.nrows): 1054 run = self.data[rind] 1055 if len(run) == 0: continue 1056 1057 # start with list of time remainders (offsets) within each TR 1058 roffsets = UTIL.interval_offsets([val for val in run], tr) 1059 1060 m0, m1, m2, s = UTIL.min_mean_max_stdev(roffsets) 1061 off_means.append(m1) 1062 off_stdev.append(s) 1063 1064 # if no events ere found, we're outta here 1065 if len(off_means) == 0: 1066 print('file %s: no events?' % self.name) 1067 return 0, '' 1068 1069 # and get overall stats (absolute and fractional) 1070 offs = self.get_TR_offset_stats(tr) 1071 1072 # print out offset info 1073 if mesg: mstr = '(%s) ' % mesg 1074 else: mstr = '' 1075 1076 rstr = '\nwithin-TR stimulus offset statistics %s:\n' % mstr 1077 1078 hdr1 = ' overall: ' 1079 hdr2 = ' fractional: ' 1080 shdr = ' ' 1081 rv = 0 1082 if self.nrows > 1: 1083 rstr += ' per run\n' \ 1084 ' ------------------------------\n' \ 1085 ' offset means %s\n' \ 1086 ' offset stdevs %s\n' \ 1087 '\n' \ 1088 % (float_list_string(off_means,ndec=3), 1089 float_list_string(off_stdev,ndec=3)) 1090 else: hdr1 = ' one run: ' 1091 1092 rstr += '%smean = %.3f maxoff = %.3f stdev = %.4f\n' \ 1093 % (hdr1, offs[1], offs[2], offs[3]) 1094 rstr += '%smean = %.3f maxoff = %.3f stdev = %.4f\n' \ 1095 % (hdr2, offs[5], offs[6], offs[7]) 1096 1097 # a warning may be issued if the min is positive and the max is small 1098 if offs[6] == 0: rstr += '\n%s(stimuli are TR-locked)\n' % shdr 1099 elif wlimit > 0.0 and offs[4] > 0 and offs[6] < wlimit: 1100 rstr += '\n' \ 1101 '%s** WARNING: small maxoff suggests (almost) TR-locked stimuli\n'\ 1102 '%s consider: timing_tool.py -round_times (if basis = TENT)\n' \ 1103 %(shdr,shdr) 1104 rv = 1 1105 1106 # clean up, just to be kind 1107 del(off_means); del(off_stdev) 1108 1109 return rv, rstr 1110 1111def float_list_string(vals, nchar=7, ndec=3, nspaces=2): 1112 str = '' 1113 for val in vals: str += '%*.*f%*s' % (nchar, ndec, val, nspaces, '') 1114 1115 return str 1116 1117def read_multi_3col_tsv(flist, hlabels=None, def_dur_lab=None, 1118 show_only=0, tsv_int=None, verb=1): 1119 """Read a set of 3 column tsv (tab separated value) files 1120 - one file per run 1121 - each with a list of events for all classes 1122 and convert it to list of AfniTiming instances. 1123 1124 A 3 column tsv file should have an optional header line, 1125 followed by rows of VAL VAL LABEL, separated by tabs. 1126 1127 Allow 4+ column files that include trailing amplitudes. 1128 For now, there might be other event labels included, 1129 so go after trailing floats, until a more robust way 1130 can be defined. 1131 1132 Use the labels to set name and possibly fname fields. 1133 1134 show_only : just show header details and return 1135 """ 1136 1137 tlist = [] # all AfniTiming instances to return 1138 1139 # if nothing passed, set default labels 1140 if hlabels == None: 1141 hlabels = g_tsv_def_labels 1142 1143 h0 = [] # original header, once set 1144 cdict = {} # dictionary of events per class type 1145 # - array of event lists, per run 1146 elist = [] # temporary variable, events for 1 run at a time 1147 for rind, fname in enumerate(flist): 1148 if show_only: print("\nparsing TSV file : %s\n" % fname) 1149 nvals, header, elist = parse_Ncol_tsv(fname, hlabels=hlabels, 1150 def_dur_lab=def_dur_lab, show_only=show_only, 1151 tsv_int=tsv_int, verb=verb) 1152 if show_only: continue 1153 if nvals <= 0: return 1, tlist 1154 1155 # store original header, else check for consistency 1156 if not h0: 1157 h0 = header 1158 if verb > 1: print('-- RM3CT: header = %s' % header) 1159 elif h0 != header and verb: 1160 print('** inconsistent column headers in 3 column tsv file %s' % fname) 1161 print(' orig: %s' % ' '.join(h0)) 1162 print(' current: %s' % ' '.join(header)) 1163 1164 # update list of class names, as they are found, 1165 # and add to cdict (including empty runs) 1166 for event in elist: 1167 cname = event[2] 1168 if cname not in list(cdict.keys()): 1169 cdict[cname] = [[]]*rind 1170 if verb > 4: 1171 print('++ RM3CT: init cdict[%s] with %s' % (cname, cdict[cname])) 1172 1173 # partition elist per known class (should be complete, as all class 1174 # names were added to dict) - okay if empty 1175 skeys = list(cdict.keys()) 1176 skeys.sort() 1177 for cname in skeys: 1178 # there might be an amplitude 1179 if nvals > 3: 1180 cevents = [[e[0], e[3], e[1]] for e in elist if e[2] == cname] 1181 else: 1182 cevents = [[e[0], [], e[1]] for e in elist if e[2] == cname] 1183 cdict[cname].append(cevents) 1184 if verb > 4: 1185 print('++ RM3CT: append cdict[%s] with %s' % (cname, cevents)) 1186 1187 # if just showing, we are done 1188 if show_only: return 0, [] 1189 1190 # now convert to AfniTiming instances 1191 # (sorted, to provide consistency across timing patterns) 1192 skeys = list(cdict.keys()) 1193 skeys.sort() 1194 for cname in skeys: 1195 mdata = cdict[cname] 1196 timing = AfniTiming(mdata=cdict[cname]) 1197 # init name and fname based on label, consider ability to change 1198 timing.name = cname 1199 timing.fname = 'times.%s.txt' % cname 1200 tlist.append(timing) 1201 if verb > 3: timing.show(mesg=('have timing for %s'%cname)) 1202 1203 return 0, tlist 1204 1205def parse_Ncol_tsv(fname, hlabels=g_tsv_def_labels, 1206 def_dur_lab=None, show_only=0, tsv_int=None, verb=1): 1207 """Read one N column tsv (tab separated value) file, and return: 1208 - ncol: -1 on error, else >= 0 1209 - header list (length ncol) 1210 - list of onset, duration, label values, amplitudes? 1211 1212 If all hlabels exist in header labels, then extract those columns. 1213 If all hlabels are integers, extract those 0-based columns. 1214 The format for hlabels should indicate : 1215 onset, duration, type and amplitudes, if any: 1216 1217 onset time, duration, trial type, amp1, amp2, ... ampA 1218 1219 def_dur_lab None : use default = 'missed' 1220 'missed' : add event as "missed_LABEL" class 1221 LABEL : alternate column label for missed events 1222 1223 show_only : show labels, then return 1224 1225 tsv_int : if set, write a smaller tsv 1226 (columns of interest - union of col_inds and cols_alt) 1227 1228 An N column tsv file should have an optional header line, 1229 followed by rows of VAL VAL LABEL, separated by tabs. 1230 """ 1231 lines = UTIL.read_text_file(fname, lines=1) 1232 if len(lines) < 1: 1233 print("** failed parse_3col_tsv for '%s'" % fname) 1234 return -1, [], [] 1235 1236 # if show_only, be verbose 1237 if show_only and verb < 4: verb = 4 1238 1239 # pare lines down to useful ones 1240 newlines = [] 1241 norig = 0 1242 for lind, line in enumerate(lines): 1243 vv = line.split('\t') 1244 vlen = len(vv) 1245 if len == 0: 1246 if verb > 2: print('** skipping empty line %d of 3col tsv file %s' \ 1247 % (lind, fname)) 1248 continue 1249 1250 # possibly initialize norig 1251 if norig == 0: norig = vlen 1252 1253 # require consistency 1254 if vlen != norig: 1255 if verb: 1256 print('** line %d ncol=%d, mismatch with orig ncol %d' % \ 1257 (lind, vlen, norig)) 1258 print('** skipping bad line: %s' % line) 1259 continue 1260 1261 newlines.append(vv) 1262 1263 lines = newlines 1264 1265 if len(lines) < 1: 1266 if verb: print("** parse_Ncol_tsv for '%s' is empty" % fname) 1267 return -1, [], [] 1268 if norig < 3: 1269 if verb: print("** parse_Ncol_tsv: bad ncols = %d in %s" % (norig,fname)) 1270 return -1, [], [] 1271 1272 # decide on column extration indices, based on hlabels and lines[0:2] 1273 col_inds = tsv_hlabels_to_col_list(hlabels, lines, verb=verb) 1274 if len(col_inds) < 3: 1275 if verb: print("** failed to make tsv column index list in %s" % fname) 1276 return -1, [], [] 1277 1278 # ---------------------------------------- 1279 # decide how to handle n/a duration 1280 1281 # default to 'missed' 1282 if def_dur_lab == None: def_dur_lab = 'missed' 1283 1284 if def_dur_lab == 'missed': 1285 col_dur_alt = -1 1286 else: 1287 # replace duration label with alt, and see if it works 1288 h_alt = hlabels[:] 1289 h_alt[1] = def_dur_lab 1290 if verb > 1: print("\n-- processing for def_dur_label, %s" % def_dur_lab) 1291 cols_alt = tsv_hlabels_to_col_list(h_alt, lines, verb=verb) 1292 if verb > 2: print("++ have cols_alt: %s" \ 1293 % UTIL.int_list_string(cols_alt,sepstr=', ')) 1294 if len(cols_alt) < 3: 1295 if verb: 1296 print("** could not find tsv column '%s' in %s"%(def_dur_lab,fname)) 1297 return -1, [], [] 1298 col_dur_alt = cols_alt[1] 1299 1300 # perhaps we want to write out cols of interest 1301 if not tsv_int is None: 1302 # first merge col_inds nand cols_alt, then write 1303 if verb > 1: 1304 print("== writing to tsv %s" % tsv_int) 1305 csub = col_inds[:] 1306 csub.extend(cols_alt) 1307 csub = UTIL.get_unique_sublist(csub) 1308 write_tsv_cols(lines, csub, ofile=tsv_int) 1309 1310 # if show_only, we are done 1311 if show_only: return 0, [], [] 1312 1313 # ---------------------------------------- 1314 # set header list, if a header exists 1315 header = [] 1316 l0 = lines[0] 1317 try: 1318 onset = float(l0[col_inds[0]]) 1319 dur = float(l0[col_inds[1]]) 1320 lab = l0[col_inds[2]].replace(' ', '_') # convert spaces to underscores 1321 except: 1322 l0 = lines.pop(0) 1323 header = [l0[col_inds[i]] for i in range(len(col_inds))] 1324 1325 # decide whether there are amplitudes 1326 ainds = col_inds[3:] 1327 1328 # ---------------------------------------- 1329 # now lines should be all: onset, duration, label and possibly amplitudes 1330 slist = [] 1331 oind = col_inds[0] 1332 dind = col_inds[1] 1333 lind = col_inds[2] 1334 for line_no, line in enumerate(lines): 1335 1336 # allow for alternate duration, another column or as a MISSING event 1337 missing_event = 0 1338 dur_txt = line[dind] 1339 if dur_txt in ['na', 'n/a', '']: 1340 if col_dur_alt >= 0: 1341 dur_txt = line[col_dur_alt] 1342 else: # then code as MISSING 1343 missing_event = 1 1344 dur_txt = '0' 1345 if verb > 2: 1346 print("-- line %d, swap dur txt [col %d] '%s' with [col %d] '%s'"\ 1347 % (line_no, dind, line[dind], col_dur_alt, dur_txt)) 1348 1349 try: 1350 onset = float(line[oind]) 1351 dur = float(dur_txt) 1352 lab = line[lind].replace(' ', '_') # convert spaces to underscores 1353 if len(ainds) > 0: 1354 amps = [float(line[aind]) for aind in ainds] 1355 except: 1356 if verb: 1357 print('** bad line Ncol tsv file %s:\n %s' \ 1358 % (fname, ' '.join(line))) 1359 print(" dur_txt = '%s'" % dur_txt) 1360 return -1, [], [] 1361 1362 # append new event, possibly with a 'MISSED' label 1363 if missing_event: 1364 use_lab = 'MISSED_%s' % lab 1365 else: 1366 use_lab = lab 1367 if len(ainds) > 0: slist.append([onset, dur, use_lab, amps]) 1368 else: slist.append([onset, dur, use_lab]) 1369 1370 nuse = len(col_inds) 1371 1372 return nuse, header, slist 1373 1374def write_tsv_cols(table, cols, ofile='stdout'): 1375 1376 if ofile == '-' or ofile == 'stdout': 1377 fp = sys.stdout 1378 else: 1379 try: 1380 fp = open(ofile, 'w') 1381 except: 1382 print("** failed to open '%s' for writing tsv cols" % ofile) 1383 return 1 1384 1385 for line in table: 1386 fp.write('\t'.join(['%s' % line[c] for c in cols])) 1387 fp.write('\n') 1388 1389 if fp != sys.stdout: 1390 fp.close() 1391 1392 return 0 1393 1394def tsv_hlabels_to_col_list(hlabs, linelists, verb=1): 1395 """return a list of columns to extract, in order of: 1396 onset, duration, label, amplitude1, amplitude2 ... 1397 1398 if hlabs are integers, extract those columns 1399 else, try to find in linelists[0] 1400 if linelists[0] is not text, return 0, 1, 2, ... 1401 else if labels do not match 1402 if ncols matches, return 0, 1, 2, ... 1403 else: fail 1404 """ 1405 1406 line0 = linelists[0] 1407 nlabs = len(hlabs) 1408 ncols = len(line0) 1409 1410 if nlabs < 3: 1411 if verb: print("** tsv hlabs has only %d entries: %s" % (nlabs, hlabs)) 1412 return [] 1413 if ncols < 3: 1414 if verb: print("** tsv file has only %d columns" % ncols) 1415 return [] 1416 1417 # first, decide whether hlabs are text labels or integers 1418 try: 1419 lints = [int(entry) for entry in hlabs] 1420 except: 1421 lints = [] 1422 have_label_ints = (len(lints) == len(hlabs)) 1423 1424 if verb > 2: 1425 print("-- TSV labels, wanted columns: %s" % ' '.join(hlabs)) 1426 print(" enumerated header cols:") 1427 lfound = '(chosen) ' 1428 lmissing = ' ' * len(lfound) 1429 for cind, label in enumerate(line0): 1430 if have_label_ints and cind in lints: 1431 lstr = lfound 1432 elif label in hlabs: 1433 lstr = lfound 1434 else: 1435 lstr = lmissing 1436 print(" col %02d %s: %s" % (cind, lstr, label)) 1437 1438 # if they are integers, we are done 1439 if len(lints) >= 3: 1440 if verb > 2: print("-- tsv labels is via index list: %s" % lints) 1441 return lints 1442 1443 1444 # decide whether line0 is text (count floats in line[0]) 1445 nfloat = ntext = 0 1446 for entry in line0: 1447 try: 1448 fval = float(entry) 1449 nfloat += 1 1450 except: 1451 ntext += 1 1452 1453 # if at least 2 floats, assume no header and return 1454 if nfloat >= 2: 1455 if verb > 1: print("-- tsv file has no header, using first columns") 1456 # so hlabs must now be taken sequential, just check the length 1457 if nlabs <= ncols: 1458 return [i for i in range(nlabs)] 1459 else: 1460 return [i for i in range(ncols)] 1461 1462 # so we have text in the header, check whether all hlabs are found 1463 lints = [] 1464 for label in hlabs: 1465 if label in line0: 1466 lints.append(line0.index(label)) 1467 elif verb > 0: 1468 print("** missing chosen label '%s'" % label) 1469 1470 # if we did not find them all, require same ncols or fail 1471 if len(lints) < nlabs: 1472 if nlabs == ncols: 1473 if verb: print("-- tsv has unexpected labels, assuming sequential") 1474 return [i for i in range(nlabs)] 1475 else: 1476 if verb > 0: 1477 print("** tsv file has unexpected labels, failing") 1478 if verb < 3: print(" (consider -verb 3)") 1479 return [] 1480 1481 # all are found, yay! 1482 if verb > 1: print("++ found expected labels in tsv file") 1483 if verb > 2: 1484 print("-- labels : %s" % ', '.join(hlabs)) 1485 print("-- index list : %s" % UTIL.int_list_string(lints, sepstr=', ')) 1486 1487 return lints 1488 1489def parse_3col_tsv(fname, verb=1): 1490 """Read one 3 column tsv (tab separated value) file, and return: 1491 - status (0 if okay) 1492 - header list (length 3?) 1493 - list of onset, duration, label values 1494 1495 A 3 column tsv file should have an optional header line, 1496 followed by rows of VAL VAL LABEL, separated by tabs. 1497 """ 1498 lines = UTIL.read_text_file(fname, lines=1) 1499 if len(lines) < 1: 1500 print("** parse_3col_tsv: failed to read text file '%s'" % fname) 1501 return 1, [], [] 1502 1503 # pare lines down to useful ones 1504 newlines = [] 1505 for lind, line in enumerate(lines): 1506 vv = line.split('\t') 1507 if len(vv) == 3: 1508 newlines.append(vv) 1509 elif len(vv) > 0: 1510 print('** skipping bad line %d of 3col tsv file %s' % (lind, fname)) 1511 if verb > 2: print(' vals[%d] = %s' % (len(vv),vv)) 1512 elif verb > 2: 1513 print('** skipping empty line %d of 3col tsv file %s' % (lind, fname)) 1514 lines = newlines 1515 1516 if len(lines) < 1: 1517 print("** parse_3col_tsv for '%s' is empty" % fname) 1518 return 1, [], [] 1519 1520 # set header list, if a header exists 1521 header = [] 1522 l0 = lines[0] 1523 try: 1524 onset = float(l0[0]) 1525 dur = float(l0[1]) 1526 lab = l0[2].replace(' ', '_') # and convert spaces to underscores 1527 except: 1528 header = lines.pop(0) 1529 1530 # now lines should be all: onset, duration, label 1531 slist = [] 1532 for line in lines: 1533 try: 1534 onset = float(line[0]) 1535 dur = float(line[1]) 1536 lab = line[2].replace(' ', '_') # convert spaces to underscores 1537 except: 1538 print('** bad line 3col tsv file %s: %s' % (fname, ' '.join(line))) 1539 return 1, [], [] 1540 slist.append([onset, dur, lab]) 1541 1542 return 0, header, slist 1543 1544def read_value_file(fname): 1545 """read value file, returning generic values in a matrix (no comments)""" 1546 try: fp = open(fname, 'r') 1547 except: 1548 print("** failed to open value file '%s'" % fname) 1549 return None 1550 1551 data = [] # data lines 1552 1553 lind = 0 1554 for line in fp.readlines(): 1555 lind += 1 1556 lary = line.split() 1557 if len(lary) == 0: continue 1558 if lary[0] == '#': continue 1559 1560 data.append([x for x in lary if x != '*']) 1561 1562 fp.close() 1563 1564 return data 1565 1566