1#!/usr/bin/env python3
2
3import glob
4import gzip
5import os
6import subprocess
7import threading
8import time
9
10# Where logs and sampled data will wind up, and where the sequences are read.
11# Do note that the sequences variable is supposed to be a tuple, because you
12# could have multiple sets of sequences.
13logdir    =  "/tmp/rdcost/logs"
14ofdir     =  "/tmp/rdcost/data"
15sequences = ("/opt/test_seqs/custom_seqs/*/*.yuv",)
16
17# Note that n_kvazaars * len(dest_qps) has to be less than the max number of
18# fd's that a process can have (check it out: ulimit -a, likely 1024)
19smt_threads   = 8 # Kinda lazy, but just match this to your cpu
20n_kvz_threads = 1 # How many threads each kvz instance is running?
21n_kvazaars    = smt_threads // n_kvz_threads
22
23# You likely will not need to change anything below this line
24kvz_srcdir    = lambda path: os.path.join(
25                                 os.path.dirname(
26                                     os.path.dirname(
27                                         os.path.realpath(__file__)
28                                     )
29                                 ), "src", path)
30
31
32dest_qps      = tuple(range(51))
33base_qps      = tuple(range(12, 43))
34
35kvzargs       = [kvz_srcdir("kvazaar"), "--threads", str(n_kvz_threads), "--preset=ultrafast", "--fastrd-sampling", "--fast-residual-cost=0"]
36kvzenv        = {"LD_LIBRARY_PATH": kvz_srcdir(".libs/")}
37
38class MultiPipeGZOutManager:
39    pipe_fn_template  = "%02i.txt"
40    gzout_fn_template = "%02i.txt.gz"
41
42    def __init__(self, odpath, dest_qps):
43        self.odpath = odpath
44        self.dest_qps = dest_qps
45
46        self.pipe_fns  = []
47        self.gzout_fns = []
48        for qp in dest_qps:
49            pipe_fn  = os.path.join(self.odpath, self.pipe_fn_template % qp)
50            gzout_fn = os.path.join(self.odpath, self.gzout_fn_template % qp)
51
52            self.pipe_fns.append(pipe_fn)
53            self.gzout_fns.append(gzout_fn)
54
55    def __enter__(self):
56        os.makedirs(self.odpath, exist_ok=True)
57        for pipe_fn in self.pipe_fns:
58            try:
59                os.unlink(pipe_fn)
60            except FileNotFoundError:
61                pass
62            os.mkfifo(pipe_fn)
63        return self
64
65    def __exit__(self, *_):
66        for pipe_fn in self.pipe_fns:
67            os.unlink(pipe_fn)
68
69    def items(self):
70        for pipe_fn, gzout_fn in zip(self.pipe_fns, self.gzout_fns):
71            yield (pipe_fn, gzout_fn)
72
73class MTSafeIterable:
74    def __init__(self, iterable):
75        self.lock = threading.Lock()
76        self.iterable = iterable
77
78    def __iter__(self):
79        return self
80
81    def __next__(self):
82        with self.lock:
83            return next(self.iterable)
84
85def combinations(xi, yi):
86    for x in xi:
87        for y in yi:
88            yield (x, y)
89
90def chain(lol):
91    for l in lol:
92        for i in l:
93            yield i
94
95# Would've used Popen with gzip, but "gzip [fifo]" with an unconnected fifo
96# will detect the situation and not block, but just consider it an empty
97# file. Don't like it when tools outsmart their user..
98def do_gzip(in_fn, out_fn):
99    BLOCK_SZ = 65536
100    PRINT_MULT = 1024
101    with open(in_fn, "rb") as inf, gzip.open(out_fn, "wb") as outf:
102        num_read = 0
103        print_next_thres = BLOCK_SZ * PRINT_MULT
104        while True:
105            block = inf.read(BLOCK_SZ)
106            num_read += len(block)
107            if (num_read >= print_next_thres):
108                print("    read     %8i MB from %s" % (num_read / (1024 * 1024), in_fn))
109                print_next_thres += BLOCK_SZ * PRINT_MULT
110
111            if (len(block) == 0):
112                break
113            outf.write(block)
114
115        print("    finished %8i MB from %s" % (num_read / (1024 * 1024), in_fn))
116
117def run_job(job):
118    ifpath, qp = job
119    ifname = os.path.basename(ifpath)
120
121    jobname  = "%s-qp%i" % (ifname, qp)
122    hevcname = "%s.hevc" % jobname
123    logname  = "%s.log"  % jobname
124    odname   = jobname
125
126    hevcpath = os.path.join("/tmp", hevcname)
127    logpath  = os.path.join(logdir, logname)
128    odpath   = os.path.join(ofdir,  odname)
129
130    my_kvzargs = kvzargs + ["-i",              ifpath,
131                            "--qp",            str(qp),
132                            "-o",              hevcpath,
133                            "--fastrd-outdir", odpath]
134
135    with open(logpath, "w") as lf:
136        with MultiPipeGZOutManager(odpath, dest_qps) as pipes_and_outputs:
137            gzip_threads = []
138            for pipe_fn, out_fn in pipes_and_outputs.items():
139                gzip_thread = threading.Thread(target=do_gzip, args=(pipe_fn, out_fn))
140                gzip_thread.start()
141                gzip_threads.append(gzip_thread)
142
143            kvz = subprocess.Popen(my_kvzargs, env=kvzenv, stderr=lf)
144            kvz.wait()
145
146def threadfunc(joblist):
147    for job in joblist:
148        run_job(job)
149
150def main():
151    assert(isinstance(sequences, tuple))
152    for d in (logdir, ofdir):
153        os.makedirs(d, exist_ok=True)
154
155    jobs = combinations(chain(map(glob.glob, sequences)), base_qps)
156    joblist = MTSafeIterable(jobs)
157
158    threads = [threading.Thread(target=threadfunc, args=(joblist,)) for _ in range(n_kvazaars)]
159    for thread in threads:
160        thread.start()
161
162    for thread in threads:
163        thread.join()
164
165if (__name__ == "__main__"):
166    main()
167