1#!/usr/bin/env python3 2 3import glob 4import gzip 5import os 6import subprocess 7import threading 8import time 9 10# Where logs and sampled data will wind up, and where the sequences are read. 11# Do note that the sequences variable is supposed to be a tuple, because you 12# could have multiple sets of sequences. 13logdir = "/tmp/rdcost/logs" 14ofdir = "/tmp/rdcost/data" 15sequences = ("/opt/test_seqs/custom_seqs/*/*.yuv",) 16 17# Note that n_kvazaars * len(dest_qps) has to be less than the max number of 18# fd's that a process can have (check it out: ulimit -a, likely 1024) 19smt_threads = 8 # Kinda lazy, but just match this to your cpu 20n_kvz_threads = 1 # How many threads each kvz instance is running? 21n_kvazaars = smt_threads // n_kvz_threads 22 23# You likely will not need to change anything below this line 24kvz_srcdir = lambda path: os.path.join( 25 os.path.dirname( 26 os.path.dirname( 27 os.path.realpath(__file__) 28 ) 29 ), "src", path) 30 31 32dest_qps = tuple(range(51)) 33base_qps = tuple(range(12, 43)) 34 35kvzargs = [kvz_srcdir("kvazaar"), "--threads", str(n_kvz_threads), "--preset=ultrafast", "--fastrd-sampling", "--fast-residual-cost=0"] 36kvzenv = {"LD_LIBRARY_PATH": kvz_srcdir(".libs/")} 37 38class MultiPipeGZOutManager: 39 pipe_fn_template = "%02i.txt" 40 gzout_fn_template = "%02i.txt.gz" 41 42 def __init__(self, odpath, dest_qps): 43 self.odpath = odpath 44 self.dest_qps = dest_qps 45 46 self.pipe_fns = [] 47 self.gzout_fns = [] 48 for qp in dest_qps: 49 pipe_fn = os.path.join(self.odpath, self.pipe_fn_template % qp) 50 gzout_fn = os.path.join(self.odpath, self.gzout_fn_template % qp) 51 52 self.pipe_fns.append(pipe_fn) 53 self.gzout_fns.append(gzout_fn) 54 55 def __enter__(self): 56 os.makedirs(self.odpath, exist_ok=True) 57 for pipe_fn in self.pipe_fns: 58 try: 59 os.unlink(pipe_fn) 60 except FileNotFoundError: 61 pass 62 os.mkfifo(pipe_fn) 63 return self 64 65 def __exit__(self, *_): 66 for pipe_fn in self.pipe_fns: 67 os.unlink(pipe_fn) 68 69 def items(self): 70 for pipe_fn, gzout_fn in zip(self.pipe_fns, self.gzout_fns): 71 yield (pipe_fn, gzout_fn) 72 73class MTSafeIterable: 74 def __init__(self, iterable): 75 self.lock = threading.Lock() 76 self.iterable = iterable 77 78 def __iter__(self): 79 return self 80 81 def __next__(self): 82 with self.lock: 83 return next(self.iterable) 84 85def combinations(xi, yi): 86 for x in xi: 87 for y in yi: 88 yield (x, y) 89 90def chain(lol): 91 for l in lol: 92 for i in l: 93 yield i 94 95# Would've used Popen with gzip, but "gzip [fifo]" with an unconnected fifo 96# will detect the situation and not block, but just consider it an empty 97# file. Don't like it when tools outsmart their user.. 98def do_gzip(in_fn, out_fn): 99 BLOCK_SZ = 65536 100 PRINT_MULT = 1024 101 with open(in_fn, "rb") as inf, gzip.open(out_fn, "wb") as outf: 102 num_read = 0 103 print_next_thres = BLOCK_SZ * PRINT_MULT 104 while True: 105 block = inf.read(BLOCK_SZ) 106 num_read += len(block) 107 if (num_read >= print_next_thres): 108 print(" read %8i MB from %s" % (num_read / (1024 * 1024), in_fn)) 109 print_next_thres += BLOCK_SZ * PRINT_MULT 110 111 if (len(block) == 0): 112 break 113 outf.write(block) 114 115 print(" finished %8i MB from %s" % (num_read / (1024 * 1024), in_fn)) 116 117def run_job(job): 118 ifpath, qp = job 119 ifname = os.path.basename(ifpath) 120 121 jobname = "%s-qp%i" % (ifname, qp) 122 hevcname = "%s.hevc" % jobname 123 logname = "%s.log" % jobname 124 odname = jobname 125 126 hevcpath = os.path.join("/tmp", hevcname) 127 logpath = os.path.join(logdir, logname) 128 odpath = os.path.join(ofdir, odname) 129 130 my_kvzargs = kvzargs + ["-i", ifpath, 131 "--qp", str(qp), 132 "-o", hevcpath, 133 "--fastrd-outdir", odpath] 134 135 with open(logpath, "w") as lf: 136 with MultiPipeGZOutManager(odpath, dest_qps) as pipes_and_outputs: 137 gzip_threads = [] 138 for pipe_fn, out_fn in pipes_and_outputs.items(): 139 gzip_thread = threading.Thread(target=do_gzip, args=(pipe_fn, out_fn)) 140 gzip_thread.start() 141 gzip_threads.append(gzip_thread) 142 143 kvz = subprocess.Popen(my_kvzargs, env=kvzenv, stderr=lf) 144 kvz.wait() 145 146def threadfunc(joblist): 147 for job in joblist: 148 run_job(job) 149 150def main(): 151 assert(isinstance(sequences, tuple)) 152 for d in (logdir, ofdir): 153 os.makedirs(d, exist_ok=True) 154 155 jobs = combinations(chain(map(glob.glob, sequences)), base_qps) 156 joblist = MTSafeIterable(jobs) 157 158 threads = [threading.Thread(target=threadfunc, args=(joblist,)) for _ in range(n_kvazaars)] 159 for thread in threads: 160 thread.start() 161 162 for thread in threads: 163 thread.join() 164 165if (__name__ == "__main__"): 166 main() 167