1#
2#  Copyright (C) 2018 Codethink Limited
3#
4#  This program is free software; you can redistribute it and/or
5#  modify it under the terms of the GNU Lesser General Public
6#  License as published by the Free Software Foundation; either
7#  version 2 of the License, or (at your option) any later version.
8#
9#  This library is distributed in the hope that it will be useful,
10#  but WITHOUT ANY WARRANTY; without even the implied warranty of
11#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
12#  Lesser General Public License for more details.
13#
14#  You should have received a copy of the GNU Lesser General Public
15#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
16#
17#  Authors:
18#        Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
19#        Jürg Billeter <juerg.billeter@codethink.co.uk>
20#        Tristan Maat <tristan.maat@codethink.co.uk>
21
22import os
23import sys
24import stat
25import shlex
26import shutil
27import tarfile
28from contextlib import contextmanager
29from tempfile import TemporaryDirectory
30
31from ._exceptions import StreamError, ImplError, BstError
32from ._message import Message, MessageType
33from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
34from ._pipeline import Pipeline, PipelineSelection
35from . import utils, _yaml, _site
36from . import Scope, Consistency
37
38
39# Stream()
40#
41# This is the main, toplevel calling interface in BuildStream core.
42#
43# Args:
44#    context (Context): The Context object
45#    project (Project): The Project object
46#    session_start (datetime): The time when the session started
47#    session_start_callback (callable): A callback to invoke when the session starts
48#    interrupt_callback (callable): A callback to invoke when we get interrupted
49#    ticker_callback (callable): Invoked every second while running the scheduler
50#    job_start_callback (callable): Called when a job starts
51#    job_complete_callback (callable): Called when a job completes
52#
53class Stream():
54
55    def __init__(self, context, project, session_start, *,
56                 session_start_callback=None,
57                 interrupt_callback=None,
58                 ticker_callback=None,
59                 job_start_callback=None,
60                 job_complete_callback=None):
61
62        #
63        # Public members
64        #
65        self.targets = []            # Resolved target elements
66        self.session_elements = []   # List of elements being processed this session
67        self.total_elements = []     # Total list of elements based on targets
68        self.queues = []             # Queue objects
69
70        #
71        # Private members
72        #
73        self._artifacts = context.artifactcache
74        self._context = context
75        self._project = project
76        self._pipeline = Pipeline(context, project, self._artifacts)
77        self._scheduler = Scheduler(context, session_start,
78                                    interrupt_callback=interrupt_callback,
79                                    ticker_callback=ticker_callback,
80                                    job_start_callback=job_start_callback,
81                                    job_complete_callback=job_complete_callback)
82        self._first_non_track_queue = None
83        self._session_start_callback = session_start_callback
84
85    # cleanup()
86    #
87    # Cleans up application state
88    #
89    def cleanup(self):
90        if self._project:
91            self._project.cleanup()
92
93    # load_selection()
94    #
95    # An all purpose method for loading a selection of elements, this
96    # is primarily useful for the frontend to implement `bst show`
97    # and `bst shell`.
98    #
99    # Args:
100    #    targets (list of str): Targets to pull
101    #    selection (PipelineSelection): The selection mode for the specified targets
102    #    except_targets (list of str): Specified targets to except from fetching
103    #
104    # Returns:
105    #    (list of Element): The selected elements
106    def load_selection(self, targets, *,
107                       selection=PipelineSelection.NONE,
108                       except_targets=()):
109        elements, _ = self._load(targets, (),
110                                 selection=selection,
111                                 except_targets=except_targets,
112                                 fetch_subprojects=False)
113        return elements
114
115    # shell()
116    #
117    # Run a shell
118    #
119    # Args:
120    #    element (Element): An Element object to run the shell for
121    #    scope (Scope): The scope for the shell (Scope.BUILD or Scope.RUN)
122    #    prompt (str): The prompt to display in the shell
123    #    directory (str): A directory where an existing prestaged sysroot is expected, or None
124    #    mounts (list of HostMount): Additional directories to mount into the sandbox
125    #    isolate (bool): Whether to isolate the environment like we do in builds
126    #    command (list): An argv to launch in the sandbox, or None
127    #
128    # Returns:
129    #    (int): The exit code of the launched shell
130    #
131    def shell(self, element, scope, prompt, *,
132              directory=None,
133              mounts=None,
134              isolate=False,
135              command=None):
136
137        # Assert we have everything we need built, unless the directory is specified
138        # in which case we just blindly trust the directory, using the element
139        # definitions to control the execution environment only.
140        if directory is None:
141            missing_deps = [
142                dep._get_full_name()
143                for dep in self._pipeline.dependencies([element], scope)
144                if not dep._cached()
145            ]
146            if missing_deps:
147                raise StreamError("Elements need to be built or downloaded before staging a shell environment",
148                                  detail="\n".join(missing_deps))
149
150        return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command)
151
152    # build()
153    #
154    # Builds (assembles) elements in the pipeline.
155    #
156    # Args:
157    #    targets (list of str): Targets to build
158    #    track_targets (list of str): Specified targets for tracking
159    #    track_except (list of str): Specified targets to except from tracking
160    #    track_cross_junctions (bool): Whether tracking should cross junction boundaries
161    #    build_all (bool): Whether to build all elements, or only those
162    #                      which are required to build the target.
163    #
164    def build(self, targets, *,
165              track_targets=None,
166              track_except=None,
167              track_cross_junctions=False,
168              build_all=False):
169
170        if build_all:
171            selection = PipelineSelection.ALL
172        else:
173            selection = PipelineSelection.PLAN
174
175        elements, track_elements = \
176            self._load(targets, track_targets,
177                       selection=selection, track_selection=PipelineSelection.ALL,
178                       track_except_targets=track_except,
179                       track_cross_junctions=track_cross_junctions,
180                       use_artifact_config=True,
181                       fetch_subprojects=True,
182                       dynamic_plan=True)
183
184        # Remove the tracking elements from the main targets
185        elements = self._pipeline.subtract_elements(elements, track_elements)
186
187        # Assert that the elements we're not going to track are consistent
188        self._pipeline.assert_consistent(elements)
189
190        # Now construct the queues
191        #
192        track_queue = None
193        if track_elements:
194            track_queue = TrackQueue(self._scheduler)
195            self._add_queue(track_queue, track=True)
196
197        if self._artifacts.has_fetch_remotes():
198            self._add_queue(PullQueue(self._scheduler))
199
200        self._add_queue(FetchQueue(self._scheduler, skip_cached=True))
201        self._add_queue(BuildQueue(self._scheduler))
202
203        if self._artifacts.has_push_remotes():
204            self._add_queue(PushQueue(self._scheduler))
205
206        # Enqueue elements
207        #
208        if track_elements:
209            self._enqueue_plan(track_elements, queue=track_queue)
210        self._enqueue_plan(elements)
211        self._run()
212
213    # fetch()
214    #
215    # Fetches sources on the pipeline.
216    #
217    # Args:
218    #    targets (list of str): Targets to fetch
219    #    selection (PipelineSelection): The selection mode for the specified targets
220    #    except_targets (list of str): Specified targets to except from fetching
221    #    track_targets (bool): Whether to track selected targets in addition to fetching
222    #    track_cross_junctions (bool): Whether tracking should cross junction boundaries
223    #
224    def fetch(self, targets, *,
225              selection=PipelineSelection.PLAN,
226              except_targets=None,
227              track_targets=False,
228              track_cross_junctions=False):
229
230        if track_targets:
231            track_targets = targets
232            track_selection = selection
233            track_except_targets = except_targets
234        else:
235            track_targets = ()
236            track_selection = PipelineSelection.NONE
237            track_except_targets = ()
238
239        elements, track_elements = \
240            self._load(targets, track_targets,
241                       selection=selection, track_selection=track_selection,
242                       except_targets=except_targets,
243                       track_except_targets=track_except_targets,
244                       track_cross_junctions=track_cross_junctions,
245                       fetch_subprojects=True)
246
247        # Delegated to a shared fetch method
248        self._fetch(elements, track_elements=track_elements)
249
250    # track()
251    #
252    # Tracks all the sources of the selected elements.
253    #
254    # Args:
255    #    targets (list of str): Targets to track
256    #    selection (PipelineSelection): The selection mode for the specified targets
257    #    except_targets (list of str): Specified targets to except from tracking
258    #    cross_junctions (bool): Whether tracking should cross junction boundaries
259    #
260    # If no error is encountered while tracking, then the project files
261    # are rewritten inline.
262    #
263    def track(self, targets, *,
264              selection=PipelineSelection.REDIRECT,
265              except_targets=None,
266              cross_junctions=False):
267
268        # We pass no target to build. Only to track. Passing build targets
269        # would fully load project configuration which might not be
270        # possible before tracking is done.
271        _, elements = \
272            self._load([], targets,
273                       selection=selection, track_selection=selection,
274                       except_targets=except_targets,
275                       track_except_targets=except_targets,
276                       track_cross_junctions=cross_junctions,
277                       fetch_subprojects=True)
278
279        track_queue = TrackQueue(self._scheduler)
280        self._add_queue(track_queue, track=True)
281        self._enqueue_plan(elements, queue=track_queue)
282        self._run()
283
284    # pull()
285    #
286    # Pulls artifacts from remote artifact server(s)
287    #
288    # Args:
289    #    targets (list of str): Targets to pull
290    #    selection (PipelineSelection): The selection mode for the specified targets
291    #    remote (str): The URL of a specific remote server to pull from, or None
292    #
293    # If `remote` specified as None, then regular configuration will be used
294    # to determine where to pull artifacts from.
295    #
296    def pull(self, targets, *,
297             selection=PipelineSelection.NONE,
298             remote=None):
299
300        use_config = True
301        if remote:
302            use_config = False
303
304        elements, _ = self._load(targets, (),
305                                 selection=selection,
306                                 use_artifact_config=use_config,
307                                 artifact_remote_url=remote,
308                                 fetch_subprojects=True)
309
310        if not self._artifacts.has_fetch_remotes():
311            raise StreamError("No artifact caches available for pulling artifacts")
312
313        self._pipeline.assert_consistent(elements)
314        self._add_queue(PullQueue(self._scheduler))
315        self._enqueue_plan(elements)
316        self._run()
317
318    # push()
319    #
320    # Pulls artifacts to remote artifact server(s)
321    #
322    # Args:
323    #    targets (list of str): Targets to push
324    #    selection (PipelineSelection): The selection mode for the specified targets
325    #    remote (str): The URL of a specific remote server to push to, or None
326    #
327    # If `remote` specified as None, then regular configuration will be used
328    # to determine where to push artifacts to.
329    #
330    def push(self, targets, *,
331             selection=PipelineSelection.NONE,
332             remote=None):
333
334        use_config = True
335        if remote:
336            use_config = False
337
338        elements, _ = self._load(targets, (),
339                                 selection=selection,
340                                 use_artifact_config=use_config,
341                                 artifact_remote_url=remote,
342                                 fetch_subprojects=True)
343
344        if not self._artifacts.has_push_remotes():
345            raise StreamError("No artifact caches available for pushing artifacts")
346
347        # Mark all dependencies of all selected elements as "pulled" before
348        # trying to push.
349        #
350        # In non-strict mode, elements which are cached by their weak keys
351        # will attempt to pull a remote artifact by it's strict key and prefer
352        # a strict key artifact, however pull does not occur when running
353        # a `bst push` session.
354        #
355        # Marking the elements as pulled is a workaround which ensures that
356        # the cache keys are resolved before pushing.
357        #
358        for element in elements:
359            element._pull_done()
360
361        self._pipeline.assert_consistent(elements)
362        self._add_queue(PushQueue(self._scheduler))
363        self._enqueue_plan(elements)
364        self._run()
365
366    # checkout()
367    #
368    # Checkout target artifact to the specified location
369    #
370    # Args:
371    #    target (str): Target to checkout
372    #    location (str): Location to checkout the artifact to
373    #    force (bool): Whether files can be overwritten if necessary
374    #    deps (str): The dependencies to checkout
375    #    integrate (bool): Whether to run integration commands
376    #    hardlinks (bool): Whether checking out files hardlinked to
377    #                      their artifacts is acceptable
378    #    tar (bool): If true, a tarball from the artifact contents will
379    #                be created, otherwise the file tree of the artifact
380    #                will be placed at the given location. If true and
381    #                location is '-', the tarball will be dumped on the
382    #                standard output.
383    #
384    def checkout(self, target, *,
385                 location=None,
386                 force=False,
387                 deps='run',
388                 integrate=True,
389                 hardlinks=False,
390                 tar=False):
391
392        # We only have one target in a checkout command
393        elements, _ = self._load((target,), (), fetch_subprojects=True)
394        target = elements[0]
395
396        if not tar:
397            try:
398                os.makedirs(location, exist_ok=True)
399            except OSError as e:
400                raise StreamError("Failed to create checkout directory: '{}'"
401                                  .format(e)) from e
402
403        if not tar:
404            if not os.access(location, os.W_OK):
405                raise StreamError("Checkout directory '{}' not writable"
406                                  .format(location))
407            if not force and os.listdir(location):
408                raise StreamError("Checkout directory '{}' not empty"
409                                  .format(location))
410        elif os.path.exists(location) and location != '-':
411            if not os.access(location, os.W_OK):
412                raise StreamError("Output file '{}' not writable"
413                                  .format(location))
414            if not force and os.path.exists(location):
415                raise StreamError("Output file '{}' already exists"
416                                  .format(location))
417
418        # Stage deps into a temporary sandbox first
419        try:
420            with target._prepare_sandbox(Scope.RUN, None, deps=deps,
421                                         integrate=integrate) as sandbox:
422
423                # Copy or move the sandbox to the target directory
424                sandbox_root = sandbox.get_directory()
425                if not tar:
426                    with target.timed_activity("Checking out files in '{}'"
427                                               .format(location)):
428                        try:
429                            if hardlinks:
430                                self._checkout_hardlinks(sandbox_root, location)
431                            else:
432                                utils.copy_files(sandbox_root, location)
433                        except OSError as e:
434                            raise StreamError("Failed to checkout files: '{}'"
435                                              .format(e)) from e
436                else:
437                    if location == '-':
438                        with target.timed_activity("Creating tarball"):
439                            with os.fdopen(sys.stdout.fileno(), 'wb') as fo:
440                                with tarfile.open(fileobj=fo, mode="w|") as tf:
441                                    Stream._add_directory_to_tarfile(
442                                        tf, sandbox_root, '.')
443                    else:
444                        with target.timed_activity("Creating tarball '{}'"
445                                                   .format(location)):
446                            with tarfile.open(location, "w:") as tf:
447                                Stream._add_directory_to_tarfile(
448                                    tf, sandbox_root, '.')
449
450        except BstError as e:
451            raise StreamError("Error while staging dependencies into a sandbox"
452                              ": '{}'".format(e), detail=e.detail, reason=e.reason) from e
453
454    # workspace_open
455    #
456    # Open a project workspace
457    #
458    # Args:
459    #    target (str): The target element to open the workspace for
460    #    directory (str): The directory to stage the source in
461    #    no_checkout (bool): Whether to skip checking out the source
462    #    track_first (bool): Whether to track and fetch first
463    #    force (bool): Whether to ignore contents in an existing directory
464    #
465    def workspace_open(self, target, directory, *,
466                       no_checkout,
467                       track_first,
468                       force):
469
470        if track_first:
471            track_targets = (target,)
472        else:
473            track_targets = ()
474
475        elements, track_elements = self._load((target,), track_targets,
476                                              selection=PipelineSelection.REDIRECT,
477                                              track_selection=PipelineSelection.REDIRECT)
478        target = elements[0]
479        workdir = os.path.abspath(directory)
480
481        if not list(target.sources()):
482            build_depends = [x.name for x in target.dependencies(Scope.BUILD, recurse=False)]
483            if not build_depends:
484                raise StreamError("The given element has no sources")
485            detail = "Try opening a workspace on one of its dependencies instead:\n"
486            detail += "  \n".join(build_depends)
487            raise StreamError("The given element has no sources", detail=detail)
488
489        workspaces = self._context.get_workspaces()
490
491        # Check for workspace config
492        workspace = workspaces.get_workspace(target._get_full_name())
493        if workspace and not force:
494            raise StreamError("Workspace '{}' is already defined at: {}"
495                              .format(target.name, workspace.path))
496
497        # If we're going to checkout, we need at least a fetch,
498        # if we were asked to track first, we're going to fetch anyway.
499        #
500        if not no_checkout or track_first:
501            track_elements = []
502            if track_first:
503                track_elements = elements
504            self._fetch(elements, track_elements=track_elements)
505
506        if not no_checkout and target._get_consistency() != Consistency.CACHED:
507            raise StreamError("Could not stage uncached source. " +
508                              "Use `--track` to track and " +
509                              "fetch the latest version of the " +
510                              "source.")
511
512        if workspace:
513            workspaces.delete_workspace(target._get_full_name())
514            workspaces.save_config()
515            shutil.rmtree(directory)
516        try:
517            os.makedirs(directory, exist_ok=True)
518        except OSError as e:
519            raise StreamError("Failed to create workspace directory: {}".format(e)) from e
520
521        workspaces.create_workspace(target._get_full_name(), workdir)
522
523        if not no_checkout:
524            with target.timed_activity("Staging sources to {}".format(directory)):
525                target._open_workspace()
526
527        workspaces.save_config()
528        self._message(MessageType.INFO, "Saved workspace configuration")
529
530    # workspace_close
531    #
532    # Close a project workspace
533    #
534    # Args:
535    #    element_name (str): The element name to close the workspace for
536    #    remove_dir (bool): Whether to remove the associated directory
537    #
538    def workspace_close(self, element_name, *, remove_dir):
539        workspaces = self._context.get_workspaces()
540        workspace = workspaces.get_workspace(element_name)
541
542        # Remove workspace directory if prompted
543        if remove_dir:
544            with self._context.timed_activity("Removing workspace directory {}"
545                                              .format(workspace.path)):
546                try:
547                    shutil.rmtree(workspace.path)
548                except OSError as e:
549                    raise StreamError("Could not remove  '{}': {}"
550                                      .format(workspace.path, e)) from e
551
552        # Delete the workspace and save the configuration
553        workspaces.delete_workspace(element_name)
554        workspaces.save_config()
555        self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
556
557    # workspace_reset
558    #
559    # Reset a workspace to its original state, discarding any user
560    # changes.
561    #
562    # Args:
563    #    targets (list of str): The target elements to reset the workspace for
564    #    soft (bool): Only reset workspace state
565    #    track_first (bool): Whether to also track the sources first
566    #
567    def workspace_reset(self, targets, *, soft, track_first):
568
569        if track_first:
570            track_targets = targets
571        else:
572            track_targets = ()
573
574        elements, track_elements = self._load(targets, track_targets,
575                                              selection=PipelineSelection.REDIRECT,
576                                              track_selection=PipelineSelection.REDIRECT)
577
578        nonexisting = []
579        for element in elements:
580            if not self.workspace_exists(element.name):
581                nonexisting.append(element.name)
582        if nonexisting:
583            raise StreamError("Workspace does not exist", detail="\n".join(nonexisting))
584
585        # Do the tracking first
586        if track_first:
587            self._fetch(elements, track_elements=track_elements)
588
589        workspaces = self._context.get_workspaces()
590
591        for element in elements:
592            workspace = workspaces.get_workspace(element._get_full_name())
593
594            if soft:
595                workspace.prepared = False
596                self._message(MessageType.INFO, "Reset workspace state for {} at: {}"
597                              .format(element.name, workspace.path))
598                continue
599
600            with element.timed_activity("Removing workspace directory {}"
601                                        .format(workspace.path)):
602                try:
603                    shutil.rmtree(workspace.path)
604                except OSError as e:
605                    raise StreamError("Could not remove  '{}': {}"
606                                      .format(workspace.path, e)) from e
607
608            workspaces.delete_workspace(element._get_full_name())
609            workspaces.create_workspace(element._get_full_name(), workspace.path)
610
611            with element.timed_activity("Staging sources to {}".format(workspace.path)):
612                element._open_workspace()
613
614            self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(element.name, workspace.path))
615
616        workspaces.save_config()
617
618    # workspace_exists
619    #
620    # Check if a workspace exists
621    #
622    # Args:
623    #    element_name (str): The element name to close the workspace for, or None
624    #
625    # Returns:
626    #    (bool): True if the workspace exists
627    #
628    # If None is specified for `element_name`, then this will return
629    # True if there are any existing workspaces.
630    #
631    def workspace_exists(self, element_name=None):
632        workspaces = self._context.get_workspaces()
633        if element_name:
634            workspace = workspaces.get_workspace(element_name)
635            if workspace:
636                return True
637        elif any(workspaces.list()):
638            return True
639
640        return False
641
642    # workspace_list
643    #
644    # Serializes the workspaces and dumps them in YAML to stdout.
645    #
646    def workspace_list(self):
647        workspaces = []
648        for element_name, workspace_ in self._context.get_workspaces().list():
649            workspace_detail = {
650                'element': element_name,
651                'directory': workspace_.path,
652            }
653            workspaces.append(workspace_detail)
654
655        _yaml.dump({
656            'workspaces': workspaces
657        })
658
659    # source_bundle()
660    #
661    # Create a host buildable tarball bundle for the given target.
662    #
663    # Args:
664    #    target (str): The target element to bundle
665    #    directory (str): The directory to output the tarball
666    #    track_first (bool): Track new source references before bundling
667    #    compression (str): The compression type to use
668    #    force (bool): Overwrite an existing tarball
669    #
670    def source_bundle(self, target, directory, *,
671                      track_first=False,
672                      force=False,
673                      compression="gz",
674                      except_targets=()):
675
676        if track_first:
677            track_targets = (target,)
678        else:
679            track_targets = ()
680
681        elements, track_elements = self._load((target,), track_targets,
682                                              selection=PipelineSelection.ALL,
683                                              except_targets=except_targets,
684                                              track_selection=PipelineSelection.ALL,
685                                              fetch_subprojects=True)
686
687        # source-bundle only supports one target
688        target = self.targets[0]
689
690        self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
691
692        # Find the correct filename for the compression algorithm
693        tar_location = os.path.join(directory, target.normal_name + ".tar")
694        if compression != "none":
695            tar_location += "." + compression
696
697        # Attempt writing a file to generate a good error message
698        # early
699        #
700        # FIXME: A bit hackish
701        try:
702            open(tar_location, mode="x")
703            os.remove(tar_location)
704        except IOError as e:
705            raise StreamError("Cannot write to {0}: {1}"
706                              .format(tar_location, e)) from e
707
708        # Fetch and possibly track first
709        #
710        self._fetch(elements, track_elements=track_elements)
711
712        # We don't use the scheduler for this as it is almost entirely IO
713        # bound.
714
715        # Create a temporary directory to build the source tree in
716        builddir = self._context.builddir
717        prefix = "{}-".format(target.normal_name)
718
719        with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
720            source_directory = os.path.join(tempdir, 'source')
721            try:
722                os.makedirs(source_directory)
723            except OSError as e:
724                raise StreamError("Failed to create directory: {}"
725                                  .format(e)) from e
726
727            # Any elements that don't implement _write_script
728            # should not be included in the later stages.
729            elements = [
730                element for element in elements
731                if self._write_element_script(source_directory, element)
732            ]
733
734            self._write_element_sources(tempdir, elements)
735            self._write_build_script(tempdir, elements)
736            self._collect_sources(tempdir, tar_location,
737                                  target.normal_name, compression)
738
739    # redirect_element_names()
740    #
741    # Takes a list of element names and returns a list where elements have been
742    # redirected to their source elements if the element file exists, and just
743    # the name, if not.
744    #
745    # Args:
746    #    elements (list of str): The element names to redirect
747    #
748    # Returns:
749    #    (list of str): The element names after redirecting
750    #
751    def redirect_element_names(self, elements):
752        element_dir = self._project.element_path
753        load_elements = []
754        output_elements = set()
755
756        for e in elements:
757            element_path = os.path.join(element_dir, e)
758            if os.path.exists(element_path):
759                load_elements.append(e)
760            else:
761                output_elements.add(e)
762        if load_elements:
763            loaded_elements, _ = self._load(load_elements, (),
764                                            selection=PipelineSelection.REDIRECT,
765                                            track_selection=PipelineSelection.REDIRECT)
766
767            for e in loaded_elements:
768                output_elements.add(e.name)
769
770        return list(output_elements)
771
772    #############################################################
773    #                 Scheduler API forwarding                  #
774    #############################################################
775
776    # running
777    #
778    # Whether the scheduler is running
779    #
780    @property
781    def running(self):
782        return self._scheduler.loop is not None
783
784    # suspended
785    #
786    # Whether the scheduler is currently suspended
787    #
788    @property
789    def suspended(self):
790        return self._scheduler.suspended
791
792    # terminated
793    #
794    # Whether the scheduler is currently terminated
795    #
796    @property
797    def terminated(self):
798        return self._scheduler.terminated
799
800    # elapsed_time
801    #
802    # Elapsed time since the session start
803    #
804    @property
805    def elapsed_time(self):
806        return self._scheduler.elapsed_time()
807
808    # terminate()
809    #
810    # Terminate jobs
811    #
812    def terminate(self):
813        self._scheduler.terminate_jobs()
814
815    # quit()
816    #
817    # Quit the session, this will continue with any ongoing
818    # jobs, use Stream.terminate() instead for cancellation
819    # of ongoing jobs
820    #
821    def quit(self):
822        self._scheduler.stop_queueing()
823
824    # suspend()
825    #
826    # Context manager to suspend ongoing jobs
827    #
828    @contextmanager
829    def suspend(self):
830        with self._scheduler.jobs_suspended():
831            yield
832
833    #############################################################
834    #                    Private Methods                        #
835    #############################################################
836
837    # _load()
838    #
839    # A convenience method for loading element lists
840    #
841    # If `targets` is not empty used project configuration will be
842    # fully loaded. If `targets` is empty, tracking will still be
843    # resolved for elements in `track_targets`, but no build pipeline
844    # will be resolved. This is behavior is import for track() to
845    # not trigger full loading of project configuration.
846    #
847    # Args:
848    #    targets (list of str): Main targets to load
849    #    track_targets (list of str): Tracking targets
850    #    selection (PipelineSelection): The selection mode for the specified targets
851    #    track_selection (PipelineSelection): The selection mode for the specified tracking targets
852    #    except_targets (list of str): Specified targets to except from fetching
853    #    track_except_targets (list of str): Specified targets to except from fetching
854    #    track_cross_junctions (bool): Whether tracking should cross junction boundaries
855    #    use_artifact_config (bool): Whether to initialize artifacts with the config
856    #    artifact_remote_url (bool): A remote url for initializing the artifacts
857    #    fetch_subprojects (bool): Whether to fetch subprojects while loading
858    #
859    # Returns:
860    #    (list of Element): The primary element selection
861    #    (list of Element): The tracking element selection
862    #
863    def _load(self, targets, track_targets, *,
864              selection=PipelineSelection.NONE,
865              track_selection=PipelineSelection.NONE,
866              except_targets=(),
867              track_except_targets=(),
868              track_cross_junctions=False,
869              use_artifact_config=False,
870              artifact_remote_url=None,
871              fetch_subprojects=False,
872              dynamic_plan=False):
873
874        # Load rewritable if we have any tracking selection to make
875        rewritable = False
876        if track_targets:
877            rewritable = True
878
879        # Load all targets
880        elements, except_elements, track_elements, track_except_elements = \
881            self._pipeline.load([targets, except_targets, track_targets, track_except_targets],
882                                rewritable=rewritable,
883                                fetch_subprojects=fetch_subprojects)
884
885        # Hold on to the targets
886        self.targets = elements
887
888        # Here we should raise an error if the track_elements targets
889        # are not dependencies of the primary targets, this is not
890        # supported.
891        #
892        # This can happen with `bst build --track`
893        #
894        if targets and not self._pipeline.targets_include(elements, track_elements):
895            raise StreamError("Specified tracking targets that are not "
896                              "within the scope of primary targets")
897
898        # First take care of marking tracking elements, this must be
899        # done before resolving element states.
900        #
901        assert track_selection != PipelineSelection.PLAN
902
903        # Tracked elements are split by owner projects in order to
904        # filter cross junctions tracking dependencies on their
905        # respective project.
906        track_projects = {}
907        for element in track_elements:
908            project = element._get_project()
909            if project not in track_projects:
910                track_projects[project] = [element]
911            else:
912                track_projects[project].append(element)
913
914        track_selected = []
915
916        for project, project_elements in track_projects.items():
917            selected = self._pipeline.get_selection(project_elements, track_selection)
918            selected = self._pipeline.track_cross_junction_filter(project,
919                                                                  selected,
920                                                                  track_cross_junctions)
921            track_selected.extend(selected)
922
923        track_selected = self._pipeline.except_elements(track_elements,
924                                                        track_selected,
925                                                        track_except_elements)
926
927        for element in track_selected:
928            element._schedule_tracking()
929
930        if not targets:
931            self._pipeline.resolve_elements(track_selected)
932            return [], track_selected
933
934        # ArtifactCache.setup_remotes expects all projects to be fully loaded
935        for project in self._context.get_projects():
936            project.ensure_fully_loaded()
937
938        # Connect to remote caches, this needs to be done before resolving element state
939        self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url)
940
941        # Now move on to loading primary selection.
942        #
943        self._pipeline.resolve_elements(elements)
944        selected = self._pipeline.get_selection(elements, selection, silent=False)
945        selected = self._pipeline.except_elements(elements,
946                                                  selected,
947                                                  except_elements)
948
949        # Set the "required" artifacts that should not be removed
950        # while this pipeline is active
951        #
952        # It must include all the artifacts which are required by the
953        # final product. Note that this is a superset of the build plan.
954        #
955        self._artifacts.mark_required_elements(self._pipeline.dependencies(elements, Scope.ALL))
956
957        if selection == PipelineSelection.PLAN and dynamic_plan:
958            # We use a dynamic build plan, only request artifacts of top-level targets,
959            # others are requested dynamically as needed.
960            # This avoids pulling, fetching, or building unneeded build-only dependencies.
961            for element in elements:
962                element._set_required()
963        else:
964            for element in selected:
965                element._set_required()
966
967        return selected, track_selected
968
969    # _message()
970    #
971    # Local message propagator
972    #
973    def _message(self, message_type, message, **kwargs):
974        args = dict(kwargs)
975        self._context.message(
976            Message(None, message_type, message, **args))
977
978    # _add_queue()
979    #
980    # Adds a queue to the stream
981    #
982    # Args:
983    #    queue (Queue): Queue to add to the pipeline
984    #    track (bool): Whether this is the tracking queue
985    #
986    def _add_queue(self, queue, *, track=False):
987        self.queues.append(queue)
988
989        if not (track or self._first_non_track_queue):
990            self._first_non_track_queue = queue
991
992    # _enqueue_plan()
993    #
994    # Enqueues planned elements to the specified queue.
995    #
996    # Args:
997    #    plan (list of Element): The list of elements to be enqueued
998    #    queue (Queue): The target queue, defaults to the first non-track queue
999    #
1000    def _enqueue_plan(self, plan, *, queue=None):
1001        queue = queue or self._first_non_track_queue
1002
1003        queue.enqueue(plan)
1004        self.session_elements += plan
1005
1006    # _run()
1007    #
1008    # Common function for running the scheduler
1009    #
1010    def _run(self):
1011
1012        # Inform the frontend of the full list of elements
1013        # and the list of elements which will be processed in this run
1014        #
1015        self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
1016
1017        if self._session_start_callback is not None:
1018            self._session_start_callback()
1019
1020        _, status = self._scheduler.run(self.queues)
1021
1022        if status == SchedStatus.ERROR:
1023            raise StreamError()
1024        elif status == SchedStatus.TERMINATED:
1025            raise StreamError(terminated=True)
1026
1027    # _fetch()
1028    #
1029    # Performs the fetch job, the body of this function is here because
1030    # it is shared between a few internals.
1031    #
1032    # Args:
1033    #    elements (list of Element): Elements to fetch
1034    #    track_elements (list of Element): Elements to track
1035    #
1036    def _fetch(self, elements, *, track_elements=None):
1037
1038        if track_elements is None:
1039            track_elements = []
1040
1041        # Subtract the track elements from the fetch elements, they will be added separately
1042        fetch_plan = self._pipeline.subtract_elements(elements, track_elements)
1043
1044        # Assert consistency for the fetch elements
1045        self._pipeline.assert_consistent(fetch_plan)
1046
1047        # Filter out elements with cached sources, only from the fetch plan
1048        # let the track plan resolve new refs.
1049        cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
1050        fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached)
1051
1052        # Construct queues, enqueue and run
1053        #
1054        track_queue = None
1055        if track_elements:
1056            track_queue = TrackQueue(self._scheduler)
1057            self._add_queue(track_queue, track=True)
1058        self._add_queue(FetchQueue(self._scheduler))
1059
1060        if track_elements:
1061            self._enqueue_plan(track_elements, queue=track_queue)
1062        self._enqueue_plan(fetch_plan)
1063        self._run()
1064
1065    # Helper function for checkout()
1066    #
1067    def _checkout_hardlinks(self, sandbox_root, directory):
1068        try:
1069            removed = utils.safe_remove(directory)
1070        except OSError as e:
1071            raise StreamError("Failed to remove checkout directory: {}".format(e)) from e
1072
1073        if removed:
1074            # Try a simple rename of the sandbox root; if that
1075            # doesnt cut it, then do the regular link files code path
1076            try:
1077                os.rename(sandbox_root, directory)
1078            except OSError:
1079                os.makedirs(directory, exist_ok=True)
1080                utils.link_files(sandbox_root, directory)
1081        else:
1082            utils.link_files(sandbox_root, directory)
1083
1084    # Add a directory entry deterministically to a tar file
1085    #
1086    # This function takes extra steps to ensure the output is deterministic.
1087    # First, it sorts the results of os.listdir() to ensure the ordering of
1088    # the files in the archive is the same.  Second, it sets a fixed
1089    # timestamp for each entry. See also https://bugs.python.org/issue24465.
1090    @staticmethod
1091    def _add_directory_to_tarfile(tf, dir_name, dir_arcname, mtime=0):
1092        for filename in sorted(os.listdir(dir_name)):
1093            name = os.path.join(dir_name, filename)
1094            arcname = os.path.join(dir_arcname, filename)
1095
1096            tarinfo = tf.gettarinfo(name, arcname)
1097            tarinfo.mtime = mtime
1098
1099            if tarinfo.isreg():
1100                with open(name, "rb") as f:
1101                    tf.addfile(tarinfo, f)
1102            elif tarinfo.isdir():
1103                tf.addfile(tarinfo)
1104                Stream._add_directory_to_tarfile(tf, name, arcname, mtime)
1105            else:
1106                tf.addfile(tarinfo)
1107
1108    # Write the element build script to the given directory
1109    def _write_element_script(self, directory, element):
1110        try:
1111            element._write_script(directory)
1112        except ImplError:
1113            return False
1114        return True
1115
1116    # Write all source elements to the given directory
1117    def _write_element_sources(self, directory, elements):
1118        for element in elements:
1119            source_dir = os.path.join(directory, "source")
1120            element_source_dir = os.path.join(source_dir, element.normal_name)
1121
1122            element._stage_sources_at(element_source_dir)
1123
1124    # Write a master build script to the sandbox
1125    def _write_build_script(self, directory, elements):
1126
1127        module_string = ""
1128        for element in elements:
1129            module_string += shlex.quote(element.normal_name) + " "
1130
1131        script_path = os.path.join(directory, "build.sh")
1132
1133        with open(_site.build_all_template, "r") as f:
1134            script_template = f.read()
1135
1136        with utils.save_file_atomic(script_path, "w") as script:
1137            script.write(script_template.format(modules=module_string))
1138
1139        os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD)
1140
1141    # Collect the sources in the given sandbox into a tarfile
1142    def _collect_sources(self, directory, tar_name, element_name, compression):
1143        with self._context.timed_activity("Creating tarball {}".format(tar_name)):
1144            if compression == "none":
1145                permissions = "w:"
1146            else:
1147                permissions = "w:" + compression
1148
1149            with tarfile.open(tar_name, permissions) as tar:
1150                tar.add(directory, arcname=element_name)
1151