1#!/usr/bin/env python3
2#
3# Copyright (C) 2020, 2021 Collabora Limited
4# Author: Gustavo Padovan <gustavo.padovan@collabora.com>
5#
6# Permission is hereby granted, free of charge, to any person obtaining a
7# copy of this software and associated documentation files (the "Software"),
8# to deal in the Software without restriction, including without limitation
9# the rights to use, copy, modify, merge, publish, distribute, sublicense,
10# and/or sell copies of the Software, and to permit persons to whom the
11# Software is furnished to do so, subject to the following conditions:
12#
13# The above copyright notice and this permission notice (including the next
14# paragraph) shall be included in all copies or substantial portions of the
15# Software.
16#
17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
20# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23# SOFTWARE.
24
25"""Send a job to LAVA, track it and collect log back"""
26
27import argparse
28import lavacli
29import os
30import sys
31import time
32import traceback
33import urllib.parse
34import xmlrpc
35import yaml
36
37from datetime import datetime, timedelta
38from lavacli.utils import loader
39
40# Timeout in minutes to decide if the device from the dispatched LAVA job has
41# hung or not due to the lack of new log output.
42DEVICE_HANGING_TIMEOUT_MIN = 5
43
44# How many seconds the script should wait before try a new polling iteration to
45# check if the dispatched LAVA job is running or waiting in the job queue.
46WAIT_FOR_DEVICE_POLLING_TIME_SEC = 10
47
48# How many seconds to wait between log output LAVA RPC calls.
49LOG_POLLING_TIME_SEC = 5
50
51# How many retries should be made when a timeout happen.
52NUMBER_OF_RETRIES_TIMEOUT_DETECTION = 2
53
54
55def print_log(msg):
56    print("{}: {}".format(datetime.now(), msg))
57
58def fatal_err(msg):
59    print_log(msg)
60    sys.exit(1)
61
62def generate_lava_yaml(args):
63    # General metadata and permissions, plus also inexplicably kernel arguments
64    values = {
65        'job_name': 'mesa: {}'.format(args.pipeline_info),
66        'device_type': args.device_type,
67        'visibility': { 'group': [ args.visibility_group ] },
68        'priority': 75,
69        'context': {
70            'extra_nfsroot_args': ' init=/init rootwait minio_results={}'.format(args.job_artifacts_base)
71        },
72        'timeouts': {
73            'job': {
74                'minutes': args.job_timeout
75            }
76        },
77    }
78
79    if args.lava_tags:
80        values['tags'] = args.lava_tags.split(',')
81
82    # URLs to our kernel rootfs to boot from, both generated by the base
83    # container build
84    deploy = {
85      'timeout': { 'minutes': 10 },
86      'to': 'tftp',
87      'os': 'oe',
88      'kernel': {
89        'url': '{}/{}'.format(args.base_system_url_prefix, args.kernel_image_name),
90      },
91      'nfsrootfs': {
92        'url': '{}/lava-rootfs.tgz'.format(args.base_system_url_prefix),
93        'compression': 'gz',
94      }
95    }
96    if args.kernel_image_type:
97        deploy['kernel']['type'] = args.kernel_image_type
98    if args.dtb:
99        deploy['dtb'] = {
100          'url': '{}/{}.dtb'.format(args.base_system_url_prefix, args.dtb)
101        }
102
103    # always boot over NFS
104    boot = {
105      'timeout': { 'minutes': 25 },
106      'method': args.boot_method,
107      'commands': 'nfs',
108      'prompts': ['lava-shell:'],
109    }
110
111    # skeleton test definition: only declaring each job as a single 'test'
112    # since LAVA's test parsing is not useful to us
113    test = {
114      'timeout': { 'minutes': args.job_timeout },
115      'failure_retry': 1,
116      'definitions': [ {
117        'name': 'mesa',
118        'from': 'inline',
119        'path': 'inline/mesa.yaml',
120        'repository': {
121          'metadata': {
122            'name': 'mesa',
123            'description': 'Mesa test plan',
124            'os': [ 'oe' ],
125            'scope': [ 'functional' ],
126            'format': 'Lava-Test Test Definition 1.0',
127          },
128          'parse': {
129            'pattern': r'hwci: (?P<test_case_id>\S*):\s+(?P<result>(pass|fail))'
130          },
131          'run': {
132          },
133        },
134      } ],
135    }
136
137    # job execution script:
138    #   - inline .gitlab-ci/common/init-stage1.sh
139    #   - fetch and unpack per-pipeline build artifacts from build job
140    #   - fetch and unpack per-job environment from lava-submit.sh
141    #   - exec .gitlab-ci/common/init-stage2.sh
142    init_lines = []
143    with open(args.first_stage_init, 'r') as init_sh:
144      init_lines += [ x.rstrip() for x in init_sh if not x.startswith('#') and x.rstrip() ]
145    init_lines += [
146      'mkdir -p {}'.format(args.ci_project_dir),
147      'wget -S --progress=dot:giga -O- {} | tar -xz -C {}'.format(args.mesa_build_url, args.ci_project_dir),
148      'wget -S --progress=dot:giga -O- {} | tar -xz -C /'.format(args.job_rootfs_overlay_url),
149      'set +x',
150      'export CI_JOB_JWT="{}"'.format(args.jwt),
151      'set -x',
152      'exec /init-stage2.sh',
153    ]
154    test['definitions'][0]['repository']['run']['steps'] = init_lines
155
156    values['actions'] = [
157      { 'deploy': deploy },
158      { 'boot': boot },
159      { 'test': test },
160    ]
161
162    return yaml.dump(values, width=10000000)
163
164
165def setup_lava_proxy():
166    config = lavacli.load_config("default")
167    uri, usr, tok = (config.get(key) for key in ("uri", "username", "token"))
168    uri_obj = urllib.parse.urlparse(uri)
169    uri_str = "{}://{}:{}@{}{}".format(uri_obj.scheme, usr, tok, uri_obj.netloc, uri_obj.path)
170    transport = lavacli.RequestsTransport(
171        uri_obj.scheme,
172        config.get("proxy"),
173        config.get("timeout", 120.0),
174        config.get("verify_ssl_cert", True),
175    )
176    proxy = xmlrpc.client.ServerProxy(
177        uri_str, allow_none=True, transport=transport)
178
179    print_log("Proxy for {} created.".format(config['uri']))
180
181    return proxy
182
183
184def _call_proxy(fn, *args):
185    retries = 60
186    for n in range(1, retries + 1):
187        try:
188            return fn(*args)
189        except xmlrpc.client.ProtocolError as err:
190            if n == retries:
191                traceback.print_exc()
192                fatal_err("A protocol error occurred (Err {} {})".format(err.errcode, err.errmsg))
193            else:
194                time.sleep(15)
195                pass
196        except xmlrpc.client.Fault as err:
197            traceback.print_exc()
198            fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode))
199
200
201def get_job_results(proxy, job_id, test_suite, test_case):
202    # Look for infrastructure errors and retry if we see them.
203    results_yaml = _call_proxy(proxy.results.get_testjob_results_yaml, job_id)
204    results = yaml.load(results_yaml, Loader=loader(False))
205    for res in results:
206        metadata = res['metadata']
207        if not 'result' in metadata or metadata['result'] != 'fail':
208            continue
209        if 'error_type' in metadata and metadata['error_type'] == "Infrastructure":
210            print_log("LAVA job {} failed with Infrastructure Error. Retry.".format(job_id))
211            return False
212        if 'case' in metadata and metadata['case'] == "validate":
213            print_log("LAVA job {} failed validation (possible download error). Retry.".format(job_id))
214            return False
215
216    results_yaml = _call_proxy(proxy.results.get_testcase_results_yaml, job_id, test_suite, test_case)
217    results = yaml.load(results_yaml, Loader=loader(False))
218    if not results:
219        fatal_err("LAVA: no result for test_suite '{}', test_case '{}'".format(test_suite, test_case))
220
221    print_log("LAVA: result for test_suite '{}', test_case '{}': {}".format(test_suite, test_case, results[0]['result']))
222    if results[0]['result'] != 'pass':
223        fatal_err("FAIL")
224
225    return True
226
227def wait_until_job_is_started(proxy, job_id):
228    print_log(f"Waiting for job {job_id} to start.")
229    current_state = "Submitted"
230    waiting_states = ["Submitted", "Scheduling", "Scheduled"]
231    while current_state in waiting_states:
232        job_state = _call_proxy(proxy.scheduler.job_state, job_id)
233        current_state = job_state["job_state"]
234
235        time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
236    print_log(f"Job {job_id} started.")
237
238def follow_job_execution(proxy, job_id):
239    line_count = 0
240    finished = False
241    last_time_logs = datetime.now()
242    while not finished:
243        (finished, data) = _call_proxy(proxy.scheduler.jobs.logs, job_id, line_count)
244        logs = yaml.load(str(data), Loader=loader(False))
245        if logs:
246            # Reset the timeout
247            last_time_logs = datetime.now()
248            for line in logs:
249                print("{} {}".format(line["dt"], line["msg"]))
250
251            line_count += len(logs)
252
253        else:
254            time_limit = timedelta(minutes=DEVICE_HANGING_TIMEOUT_MIN)
255            if datetime.now() - last_time_logs > time_limit:
256                print_log("LAVA job {} doesn't advance (machine got hung?). Retry.".format(job_id))
257                return False
258
259        # `proxy.scheduler.jobs.logs` does not block, even when there is no
260        # new log to be fetched. To avoid dosing the LAVA dispatcher
261        # machine, let's add a sleep to save them some stamina.
262        time.sleep(LOG_POLLING_TIME_SEC)
263
264    return True
265
266def show_job_data(proxy, job_id):
267    show = _call_proxy(proxy.scheduler.jobs.show, job_id)
268    for field, value in show.items():
269        print("{}\t: {}".format(field, value))
270
271
272def validate_job(proxy, job_file):
273    try:
274        return _call_proxy(proxy.scheduler.jobs.validate, job_file, True)
275    except:
276        return False
277
278def submit_job(proxy, job_file):
279    return _call_proxy(proxy.scheduler.jobs.submit, job_file)
280
281
282def main(args):
283    proxy = setup_lava_proxy()
284
285    yaml_file = generate_lava_yaml(args)
286
287    if args.dump_yaml:
288        censored_args = args
289        censored_args.jwt = "jwt-hidden"
290        print(generate_lava_yaml(censored_args))
291
292    if args.validate_only:
293        ret = validate_job(proxy, yaml_file)
294        if not ret:
295            fatal_err("Error in LAVA job definition")
296        print("LAVA job definition validated successfully")
297        return
298
299    retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
300
301    while retry_count >= 0:
302        job_id = submit_job(proxy, yaml_file)
303
304        print_log("LAVA job id: {}".format(job_id))
305
306        wait_until_job_is_started(proxy, job_id)
307
308        if not follow_job_execution(proxy, job_id):
309            print_log(f"Job {job_id} has timed out. Cancelling it.")
310            # Cancel the job as it is considered unreachable by Mesa CI.
311            proxy.scheduler.jobs.cancel(job_id)
312
313            retry_count -= 1
314            continue
315
316        show_job_data(proxy, job_id)
317
318        if get_job_results(proxy,  job_id, "0_mesa", "mesa") == True:
319             break
320
321
322if __name__ == '__main__':
323    # given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us ->
324    # GitLab runner -> GitLab primary -> user, safe to say we don't need any
325    # more buffering
326    sys.stdout.reconfigure(line_buffering=True)
327    sys.stderr.reconfigure(line_buffering=True)
328    parser = argparse.ArgumentParser("LAVA job submitter")
329
330    parser.add_argument("--pipeline-info")
331    parser.add_argument("--base-system-url-prefix")
332    parser.add_argument("--mesa-build-url")
333    parser.add_argument("--job-rootfs-overlay-url")
334    parser.add_argument("--job-artifacts-base")
335    parser.add_argument("--job-timeout", type=int)
336    parser.add_argument("--first-stage-init")
337    parser.add_argument("--ci-project-dir")
338    parser.add_argument("--device-type")
339    parser.add_argument("--dtb", nargs='?', default="")
340    parser.add_argument("--kernel-image-name")
341    parser.add_argument("--kernel-image-type", nargs='?', default="")
342    parser.add_argument("--boot-method")
343    parser.add_argument("--lava-tags", nargs='?', default="")
344    parser.add_argument("--jwt")
345    parser.add_argument("--validate-only", action='store_true')
346    parser.add_argument("--dump-yaml", action='store_true')
347    parser.add_argument("--visibility-group")
348
349    parser.set_defaults(func=main)
350    args = parser.parse_args()
351    args.func(args)
352