1# 2# Copyright (C) 2018 Codethink Limited 3# 4# This program is free software; you can redistribute it and/or 5# modify it under the terms of the GNU Lesser General Public 6# License as published by the Free Software Foundation; either 7# version 2 of the License, or (at your option) any later version. 8# 9# This library is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12# Lesser General Public License for more details. 13# 14# You should have received a copy of the GNU Lesser General Public 15# License along with this library. If not, see <http://www.gnu.org/licenses/>. 16# 17# Authors: 18# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> 19# Jürg Billeter <juerg.billeter@codethink.co.uk> 20# Tristan Maat <tristan.maat@codethink.co.uk> 21 22import os 23import sys 24import stat 25import shlex 26import shutil 27import tarfile 28from contextlib import contextmanager 29from tempfile import TemporaryDirectory 30 31from ._exceptions import StreamError, ImplError, BstError 32from ._message import Message, MessageType 33from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue 34from ._pipeline import Pipeline, PipelineSelection 35from . import utils, _yaml, _site 36from . import Scope, Consistency 37 38 39# Stream() 40# 41# This is the main, toplevel calling interface in BuildStream core. 42# 43# Args: 44# context (Context): The Context object 45# project (Project): The Project object 46# session_start (datetime): The time when the session started 47# session_start_callback (callable): A callback to invoke when the session starts 48# interrupt_callback (callable): A callback to invoke when we get interrupted 49# ticker_callback (callable): Invoked every second while running the scheduler 50# job_start_callback (callable): Called when a job starts 51# job_complete_callback (callable): Called when a job completes 52# 53class Stream(): 54 55 def __init__(self, context, project, session_start, *, 56 session_start_callback=None, 57 interrupt_callback=None, 58 ticker_callback=None, 59 job_start_callback=None, 60 job_complete_callback=None): 61 62 # 63 # Public members 64 # 65 self.targets = [] # Resolved target elements 66 self.session_elements = [] # List of elements being processed this session 67 self.total_elements = [] # Total list of elements based on targets 68 self.queues = [] # Queue objects 69 70 # 71 # Private members 72 # 73 self._artifacts = context.artifactcache 74 self._context = context 75 self._project = project 76 self._pipeline = Pipeline(context, project, self._artifacts) 77 self._scheduler = Scheduler(context, session_start, 78 interrupt_callback=interrupt_callback, 79 ticker_callback=ticker_callback, 80 job_start_callback=job_start_callback, 81 job_complete_callback=job_complete_callback) 82 self._first_non_track_queue = None 83 self._session_start_callback = session_start_callback 84 85 # cleanup() 86 # 87 # Cleans up application state 88 # 89 def cleanup(self): 90 if self._project: 91 self._project.cleanup() 92 93 # load_selection() 94 # 95 # An all purpose method for loading a selection of elements, this 96 # is primarily useful for the frontend to implement `bst show` 97 # and `bst shell`. 98 # 99 # Args: 100 # targets (list of str): Targets to pull 101 # selection (PipelineSelection): The selection mode for the specified targets 102 # except_targets (list of str): Specified targets to except from fetching 103 # 104 # Returns: 105 # (list of Element): The selected elements 106 def load_selection(self, targets, *, 107 selection=PipelineSelection.NONE, 108 except_targets=()): 109 elements, _ = self._load(targets, (), 110 selection=selection, 111 except_targets=except_targets, 112 fetch_subprojects=False) 113 return elements 114 115 # shell() 116 # 117 # Run a shell 118 # 119 # Args: 120 # element (Element): An Element object to run the shell for 121 # scope (Scope): The scope for the shell (Scope.BUILD or Scope.RUN) 122 # prompt (str): The prompt to display in the shell 123 # directory (str): A directory where an existing prestaged sysroot is expected, or None 124 # mounts (list of HostMount): Additional directories to mount into the sandbox 125 # isolate (bool): Whether to isolate the environment like we do in builds 126 # command (list): An argv to launch in the sandbox, or None 127 # 128 # Returns: 129 # (int): The exit code of the launched shell 130 # 131 def shell(self, element, scope, prompt, *, 132 directory=None, 133 mounts=None, 134 isolate=False, 135 command=None): 136 137 # Assert we have everything we need built, unless the directory is specified 138 # in which case we just blindly trust the directory, using the element 139 # definitions to control the execution environment only. 140 if directory is None: 141 missing_deps = [ 142 dep._get_full_name() 143 for dep in self._pipeline.dependencies([element], scope) 144 if not dep._cached() 145 ] 146 if missing_deps: 147 raise StreamError("Elements need to be built or downloaded before staging a shell environment", 148 detail="\n".join(missing_deps)) 149 150 return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command) 151 152 # build() 153 # 154 # Builds (assembles) elements in the pipeline. 155 # 156 # Args: 157 # targets (list of str): Targets to build 158 # track_targets (list of str): Specified targets for tracking 159 # track_except (list of str): Specified targets to except from tracking 160 # track_cross_junctions (bool): Whether tracking should cross junction boundaries 161 # build_all (bool): Whether to build all elements, or only those 162 # which are required to build the target. 163 # 164 def build(self, targets, *, 165 track_targets=None, 166 track_except=None, 167 track_cross_junctions=False, 168 build_all=False): 169 170 if build_all: 171 selection = PipelineSelection.ALL 172 else: 173 selection = PipelineSelection.PLAN 174 175 elements, track_elements = \ 176 self._load(targets, track_targets, 177 selection=selection, track_selection=PipelineSelection.ALL, 178 track_except_targets=track_except, 179 track_cross_junctions=track_cross_junctions, 180 use_artifact_config=True, 181 fetch_subprojects=True, 182 dynamic_plan=True) 183 184 # Remove the tracking elements from the main targets 185 elements = self._pipeline.subtract_elements(elements, track_elements) 186 187 # Assert that the elements we're not going to track are consistent 188 self._pipeline.assert_consistent(elements) 189 190 # Now construct the queues 191 # 192 track_queue = None 193 if track_elements: 194 track_queue = TrackQueue(self._scheduler) 195 self._add_queue(track_queue, track=True) 196 197 if self._artifacts.has_fetch_remotes(): 198 self._add_queue(PullQueue(self._scheduler)) 199 200 self._add_queue(FetchQueue(self._scheduler, skip_cached=True)) 201 self._add_queue(BuildQueue(self._scheduler)) 202 203 if self._artifacts.has_push_remotes(): 204 self._add_queue(PushQueue(self._scheduler)) 205 206 # Enqueue elements 207 # 208 if track_elements: 209 self._enqueue_plan(track_elements, queue=track_queue) 210 self._enqueue_plan(elements) 211 self._run() 212 213 # fetch() 214 # 215 # Fetches sources on the pipeline. 216 # 217 # Args: 218 # targets (list of str): Targets to fetch 219 # selection (PipelineSelection): The selection mode for the specified targets 220 # except_targets (list of str): Specified targets to except from fetching 221 # track_targets (bool): Whether to track selected targets in addition to fetching 222 # track_cross_junctions (bool): Whether tracking should cross junction boundaries 223 # 224 def fetch(self, targets, *, 225 selection=PipelineSelection.PLAN, 226 except_targets=None, 227 track_targets=False, 228 track_cross_junctions=False): 229 230 if track_targets: 231 track_targets = targets 232 track_selection = selection 233 track_except_targets = except_targets 234 else: 235 track_targets = () 236 track_selection = PipelineSelection.NONE 237 track_except_targets = () 238 239 elements, track_elements = \ 240 self._load(targets, track_targets, 241 selection=selection, track_selection=track_selection, 242 except_targets=except_targets, 243 track_except_targets=track_except_targets, 244 track_cross_junctions=track_cross_junctions, 245 fetch_subprojects=True) 246 247 # Delegated to a shared fetch method 248 self._fetch(elements, track_elements=track_elements) 249 250 # track() 251 # 252 # Tracks all the sources of the selected elements. 253 # 254 # Args: 255 # targets (list of str): Targets to track 256 # selection (PipelineSelection): The selection mode for the specified targets 257 # except_targets (list of str): Specified targets to except from tracking 258 # cross_junctions (bool): Whether tracking should cross junction boundaries 259 # 260 # If no error is encountered while tracking, then the project files 261 # are rewritten inline. 262 # 263 def track(self, targets, *, 264 selection=PipelineSelection.REDIRECT, 265 except_targets=None, 266 cross_junctions=False): 267 268 # We pass no target to build. Only to track. Passing build targets 269 # would fully load project configuration which might not be 270 # possible before tracking is done. 271 _, elements = \ 272 self._load([], targets, 273 selection=selection, track_selection=selection, 274 except_targets=except_targets, 275 track_except_targets=except_targets, 276 track_cross_junctions=cross_junctions, 277 fetch_subprojects=True) 278 279 track_queue = TrackQueue(self._scheduler) 280 self._add_queue(track_queue, track=True) 281 self._enqueue_plan(elements, queue=track_queue) 282 self._run() 283 284 # pull() 285 # 286 # Pulls artifacts from remote artifact server(s) 287 # 288 # Args: 289 # targets (list of str): Targets to pull 290 # selection (PipelineSelection): The selection mode for the specified targets 291 # remote (str): The URL of a specific remote server to pull from, or None 292 # 293 # If `remote` specified as None, then regular configuration will be used 294 # to determine where to pull artifacts from. 295 # 296 def pull(self, targets, *, 297 selection=PipelineSelection.NONE, 298 remote=None): 299 300 use_config = True 301 if remote: 302 use_config = False 303 304 elements, _ = self._load(targets, (), 305 selection=selection, 306 use_artifact_config=use_config, 307 artifact_remote_url=remote, 308 fetch_subprojects=True) 309 310 if not self._artifacts.has_fetch_remotes(): 311 raise StreamError("No artifact caches available for pulling artifacts") 312 313 self._pipeline.assert_consistent(elements) 314 self._add_queue(PullQueue(self._scheduler)) 315 self._enqueue_plan(elements) 316 self._run() 317 318 # push() 319 # 320 # Pulls artifacts to remote artifact server(s) 321 # 322 # Args: 323 # targets (list of str): Targets to push 324 # selection (PipelineSelection): The selection mode for the specified targets 325 # remote (str): The URL of a specific remote server to push to, or None 326 # 327 # If `remote` specified as None, then regular configuration will be used 328 # to determine where to push artifacts to. 329 # 330 def push(self, targets, *, 331 selection=PipelineSelection.NONE, 332 remote=None): 333 334 use_config = True 335 if remote: 336 use_config = False 337 338 elements, _ = self._load(targets, (), 339 selection=selection, 340 use_artifact_config=use_config, 341 artifact_remote_url=remote, 342 fetch_subprojects=True) 343 344 if not self._artifacts.has_push_remotes(): 345 raise StreamError("No artifact caches available for pushing artifacts") 346 347 # Mark all dependencies of all selected elements as "pulled" before 348 # trying to push. 349 # 350 # In non-strict mode, elements which are cached by their weak keys 351 # will attempt to pull a remote artifact by it's strict key and prefer 352 # a strict key artifact, however pull does not occur when running 353 # a `bst push` session. 354 # 355 # Marking the elements as pulled is a workaround which ensures that 356 # the cache keys are resolved before pushing. 357 # 358 for element in elements: 359 element._pull_done() 360 361 self._pipeline.assert_consistent(elements) 362 self._add_queue(PushQueue(self._scheduler)) 363 self._enqueue_plan(elements) 364 self._run() 365 366 # checkout() 367 # 368 # Checkout target artifact to the specified location 369 # 370 # Args: 371 # target (str): Target to checkout 372 # location (str): Location to checkout the artifact to 373 # force (bool): Whether files can be overwritten if necessary 374 # deps (str): The dependencies to checkout 375 # integrate (bool): Whether to run integration commands 376 # hardlinks (bool): Whether checking out files hardlinked to 377 # their artifacts is acceptable 378 # tar (bool): If true, a tarball from the artifact contents will 379 # be created, otherwise the file tree of the artifact 380 # will be placed at the given location. If true and 381 # location is '-', the tarball will be dumped on the 382 # standard output. 383 # 384 def checkout(self, target, *, 385 location=None, 386 force=False, 387 deps='run', 388 integrate=True, 389 hardlinks=False, 390 tar=False): 391 392 # We only have one target in a checkout command 393 elements, _ = self._load((target,), (), fetch_subprojects=True) 394 target = elements[0] 395 396 if not tar: 397 try: 398 os.makedirs(location, exist_ok=True) 399 except OSError as e: 400 raise StreamError("Failed to create checkout directory: '{}'" 401 .format(e)) from e 402 403 if not tar: 404 if not os.access(location, os.W_OK): 405 raise StreamError("Checkout directory '{}' not writable" 406 .format(location)) 407 if not force and os.listdir(location): 408 raise StreamError("Checkout directory '{}' not empty" 409 .format(location)) 410 elif os.path.exists(location) and location != '-': 411 if not os.access(location, os.W_OK): 412 raise StreamError("Output file '{}' not writable" 413 .format(location)) 414 if not force and os.path.exists(location): 415 raise StreamError("Output file '{}' already exists" 416 .format(location)) 417 418 # Stage deps into a temporary sandbox first 419 try: 420 with target._prepare_sandbox(Scope.RUN, None, deps=deps, 421 integrate=integrate) as sandbox: 422 423 # Copy or move the sandbox to the target directory 424 sandbox_root = sandbox.get_directory() 425 if not tar: 426 with target.timed_activity("Checking out files in '{}'" 427 .format(location)): 428 try: 429 if hardlinks: 430 self._checkout_hardlinks(sandbox_root, location) 431 else: 432 utils.copy_files(sandbox_root, location) 433 except OSError as e: 434 raise StreamError("Failed to checkout files: '{}'" 435 .format(e)) from e 436 else: 437 if location == '-': 438 with target.timed_activity("Creating tarball"): 439 with os.fdopen(sys.stdout.fileno(), 'wb') as fo: 440 with tarfile.open(fileobj=fo, mode="w|") as tf: 441 Stream._add_directory_to_tarfile( 442 tf, sandbox_root, '.') 443 else: 444 with target.timed_activity("Creating tarball '{}'" 445 .format(location)): 446 with tarfile.open(location, "w:") as tf: 447 Stream._add_directory_to_tarfile( 448 tf, sandbox_root, '.') 449 450 except BstError as e: 451 raise StreamError("Error while staging dependencies into a sandbox" 452 ": '{}'".format(e), detail=e.detail, reason=e.reason) from e 453 454 # workspace_open 455 # 456 # Open a project workspace 457 # 458 # Args: 459 # target (str): The target element to open the workspace for 460 # directory (str): The directory to stage the source in 461 # no_checkout (bool): Whether to skip checking out the source 462 # track_first (bool): Whether to track and fetch first 463 # force (bool): Whether to ignore contents in an existing directory 464 # 465 def workspace_open(self, target, directory, *, 466 no_checkout, 467 track_first, 468 force): 469 470 if track_first: 471 track_targets = (target,) 472 else: 473 track_targets = () 474 475 elements, track_elements = self._load((target,), track_targets, 476 selection=PipelineSelection.REDIRECT, 477 track_selection=PipelineSelection.REDIRECT) 478 target = elements[0] 479 workdir = os.path.abspath(directory) 480 481 if not list(target.sources()): 482 build_depends = [x.name for x in target.dependencies(Scope.BUILD, recurse=False)] 483 if not build_depends: 484 raise StreamError("The given element has no sources") 485 detail = "Try opening a workspace on one of its dependencies instead:\n" 486 detail += " \n".join(build_depends) 487 raise StreamError("The given element has no sources", detail=detail) 488 489 workspaces = self._context.get_workspaces() 490 491 # Check for workspace config 492 workspace = workspaces.get_workspace(target._get_full_name()) 493 if workspace and not force: 494 raise StreamError("Workspace '{}' is already defined at: {}" 495 .format(target.name, workspace.path)) 496 497 # If we're going to checkout, we need at least a fetch, 498 # if we were asked to track first, we're going to fetch anyway. 499 # 500 if not no_checkout or track_first: 501 track_elements = [] 502 if track_first: 503 track_elements = elements 504 self._fetch(elements, track_elements=track_elements) 505 506 if not no_checkout and target._get_consistency() != Consistency.CACHED: 507 raise StreamError("Could not stage uncached source. " + 508 "Use `--track` to track and " + 509 "fetch the latest version of the " + 510 "source.") 511 512 if workspace: 513 workspaces.delete_workspace(target._get_full_name()) 514 workspaces.save_config() 515 shutil.rmtree(directory) 516 try: 517 os.makedirs(directory, exist_ok=True) 518 except OSError as e: 519 raise StreamError("Failed to create workspace directory: {}".format(e)) from e 520 521 workspaces.create_workspace(target._get_full_name(), workdir) 522 523 if not no_checkout: 524 with target.timed_activity("Staging sources to {}".format(directory)): 525 target._open_workspace() 526 527 workspaces.save_config() 528 self._message(MessageType.INFO, "Saved workspace configuration") 529 530 # workspace_close 531 # 532 # Close a project workspace 533 # 534 # Args: 535 # element_name (str): The element name to close the workspace for 536 # remove_dir (bool): Whether to remove the associated directory 537 # 538 def workspace_close(self, element_name, *, remove_dir): 539 workspaces = self._context.get_workspaces() 540 workspace = workspaces.get_workspace(element_name) 541 542 # Remove workspace directory if prompted 543 if remove_dir: 544 with self._context.timed_activity("Removing workspace directory {}" 545 .format(workspace.path)): 546 try: 547 shutil.rmtree(workspace.path) 548 except OSError as e: 549 raise StreamError("Could not remove '{}': {}" 550 .format(workspace.path, e)) from e 551 552 # Delete the workspace and save the configuration 553 workspaces.delete_workspace(element_name) 554 workspaces.save_config() 555 self._message(MessageType.INFO, "Closed workspace for {}".format(element_name)) 556 557 # workspace_reset 558 # 559 # Reset a workspace to its original state, discarding any user 560 # changes. 561 # 562 # Args: 563 # targets (list of str): The target elements to reset the workspace for 564 # soft (bool): Only reset workspace state 565 # track_first (bool): Whether to also track the sources first 566 # 567 def workspace_reset(self, targets, *, soft, track_first): 568 569 if track_first: 570 track_targets = targets 571 else: 572 track_targets = () 573 574 elements, track_elements = self._load(targets, track_targets, 575 selection=PipelineSelection.REDIRECT, 576 track_selection=PipelineSelection.REDIRECT) 577 578 nonexisting = [] 579 for element in elements: 580 if not self.workspace_exists(element.name): 581 nonexisting.append(element.name) 582 if nonexisting: 583 raise StreamError("Workspace does not exist", detail="\n".join(nonexisting)) 584 585 # Do the tracking first 586 if track_first: 587 self._fetch(elements, track_elements=track_elements) 588 589 workspaces = self._context.get_workspaces() 590 591 for element in elements: 592 workspace = workspaces.get_workspace(element._get_full_name()) 593 594 if soft: 595 workspace.prepared = False 596 self._message(MessageType.INFO, "Reset workspace state for {} at: {}" 597 .format(element.name, workspace.path)) 598 continue 599 600 with element.timed_activity("Removing workspace directory {}" 601 .format(workspace.path)): 602 try: 603 shutil.rmtree(workspace.path) 604 except OSError as e: 605 raise StreamError("Could not remove '{}': {}" 606 .format(workspace.path, e)) from e 607 608 workspaces.delete_workspace(element._get_full_name()) 609 workspaces.create_workspace(element._get_full_name(), workspace.path) 610 611 with element.timed_activity("Staging sources to {}".format(workspace.path)): 612 element._open_workspace() 613 614 self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(element.name, workspace.path)) 615 616 workspaces.save_config() 617 618 # workspace_exists 619 # 620 # Check if a workspace exists 621 # 622 # Args: 623 # element_name (str): The element name to close the workspace for, or None 624 # 625 # Returns: 626 # (bool): True if the workspace exists 627 # 628 # If None is specified for `element_name`, then this will return 629 # True if there are any existing workspaces. 630 # 631 def workspace_exists(self, element_name=None): 632 workspaces = self._context.get_workspaces() 633 if element_name: 634 workspace = workspaces.get_workspace(element_name) 635 if workspace: 636 return True 637 elif any(workspaces.list()): 638 return True 639 640 return False 641 642 # workspace_list 643 # 644 # Serializes the workspaces and dumps them in YAML to stdout. 645 # 646 def workspace_list(self): 647 workspaces = [] 648 for element_name, workspace_ in self._context.get_workspaces().list(): 649 workspace_detail = { 650 'element': element_name, 651 'directory': workspace_.path, 652 } 653 workspaces.append(workspace_detail) 654 655 _yaml.dump({ 656 'workspaces': workspaces 657 }) 658 659 # source_bundle() 660 # 661 # Create a host buildable tarball bundle for the given target. 662 # 663 # Args: 664 # target (str): The target element to bundle 665 # directory (str): The directory to output the tarball 666 # track_first (bool): Track new source references before bundling 667 # compression (str): The compression type to use 668 # force (bool): Overwrite an existing tarball 669 # 670 def source_bundle(self, target, directory, *, 671 track_first=False, 672 force=False, 673 compression="gz", 674 except_targets=()): 675 676 if track_first: 677 track_targets = (target,) 678 else: 679 track_targets = () 680 681 elements, track_elements = self._load((target,), track_targets, 682 selection=PipelineSelection.ALL, 683 except_targets=except_targets, 684 track_selection=PipelineSelection.ALL, 685 fetch_subprojects=True) 686 687 # source-bundle only supports one target 688 target = self.targets[0] 689 690 self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name)) 691 692 # Find the correct filename for the compression algorithm 693 tar_location = os.path.join(directory, target.normal_name + ".tar") 694 if compression != "none": 695 tar_location += "." + compression 696 697 # Attempt writing a file to generate a good error message 698 # early 699 # 700 # FIXME: A bit hackish 701 try: 702 open(tar_location, mode="x") 703 os.remove(tar_location) 704 except IOError as e: 705 raise StreamError("Cannot write to {0}: {1}" 706 .format(tar_location, e)) from e 707 708 # Fetch and possibly track first 709 # 710 self._fetch(elements, track_elements=track_elements) 711 712 # We don't use the scheduler for this as it is almost entirely IO 713 # bound. 714 715 # Create a temporary directory to build the source tree in 716 builddir = self._context.builddir 717 prefix = "{}-".format(target.normal_name) 718 719 with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir: 720 source_directory = os.path.join(tempdir, 'source') 721 try: 722 os.makedirs(source_directory) 723 except OSError as e: 724 raise StreamError("Failed to create directory: {}" 725 .format(e)) from e 726 727 # Any elements that don't implement _write_script 728 # should not be included in the later stages. 729 elements = [ 730 element for element in elements 731 if self._write_element_script(source_directory, element) 732 ] 733 734 self._write_element_sources(tempdir, elements) 735 self._write_build_script(tempdir, elements) 736 self._collect_sources(tempdir, tar_location, 737 target.normal_name, compression) 738 739 # redirect_element_names() 740 # 741 # Takes a list of element names and returns a list where elements have been 742 # redirected to their source elements if the element file exists, and just 743 # the name, if not. 744 # 745 # Args: 746 # elements (list of str): The element names to redirect 747 # 748 # Returns: 749 # (list of str): The element names after redirecting 750 # 751 def redirect_element_names(self, elements): 752 element_dir = self._project.element_path 753 load_elements = [] 754 output_elements = set() 755 756 for e in elements: 757 element_path = os.path.join(element_dir, e) 758 if os.path.exists(element_path): 759 load_elements.append(e) 760 else: 761 output_elements.add(e) 762 if load_elements: 763 loaded_elements, _ = self._load(load_elements, (), 764 selection=PipelineSelection.REDIRECT, 765 track_selection=PipelineSelection.REDIRECT) 766 767 for e in loaded_elements: 768 output_elements.add(e.name) 769 770 return list(output_elements) 771 772 ############################################################# 773 # Scheduler API forwarding # 774 ############################################################# 775 776 # running 777 # 778 # Whether the scheduler is running 779 # 780 @property 781 def running(self): 782 return self._scheduler.loop is not None 783 784 # suspended 785 # 786 # Whether the scheduler is currently suspended 787 # 788 @property 789 def suspended(self): 790 return self._scheduler.suspended 791 792 # terminated 793 # 794 # Whether the scheduler is currently terminated 795 # 796 @property 797 def terminated(self): 798 return self._scheduler.terminated 799 800 # elapsed_time 801 # 802 # Elapsed time since the session start 803 # 804 @property 805 def elapsed_time(self): 806 return self._scheduler.elapsed_time() 807 808 # terminate() 809 # 810 # Terminate jobs 811 # 812 def terminate(self): 813 self._scheduler.terminate_jobs() 814 815 # quit() 816 # 817 # Quit the session, this will continue with any ongoing 818 # jobs, use Stream.terminate() instead for cancellation 819 # of ongoing jobs 820 # 821 def quit(self): 822 self._scheduler.stop_queueing() 823 824 # suspend() 825 # 826 # Context manager to suspend ongoing jobs 827 # 828 @contextmanager 829 def suspend(self): 830 with self._scheduler.jobs_suspended(): 831 yield 832 833 ############################################################# 834 # Private Methods # 835 ############################################################# 836 837 # _load() 838 # 839 # A convenience method for loading element lists 840 # 841 # If `targets` is not empty used project configuration will be 842 # fully loaded. If `targets` is empty, tracking will still be 843 # resolved for elements in `track_targets`, but no build pipeline 844 # will be resolved. This is behavior is import for track() to 845 # not trigger full loading of project configuration. 846 # 847 # Args: 848 # targets (list of str): Main targets to load 849 # track_targets (list of str): Tracking targets 850 # selection (PipelineSelection): The selection mode for the specified targets 851 # track_selection (PipelineSelection): The selection mode for the specified tracking targets 852 # except_targets (list of str): Specified targets to except from fetching 853 # track_except_targets (list of str): Specified targets to except from fetching 854 # track_cross_junctions (bool): Whether tracking should cross junction boundaries 855 # use_artifact_config (bool): Whether to initialize artifacts with the config 856 # artifact_remote_url (bool): A remote url for initializing the artifacts 857 # fetch_subprojects (bool): Whether to fetch subprojects while loading 858 # 859 # Returns: 860 # (list of Element): The primary element selection 861 # (list of Element): The tracking element selection 862 # 863 def _load(self, targets, track_targets, *, 864 selection=PipelineSelection.NONE, 865 track_selection=PipelineSelection.NONE, 866 except_targets=(), 867 track_except_targets=(), 868 track_cross_junctions=False, 869 use_artifact_config=False, 870 artifact_remote_url=None, 871 fetch_subprojects=False, 872 dynamic_plan=False): 873 874 # Load rewritable if we have any tracking selection to make 875 rewritable = False 876 if track_targets: 877 rewritable = True 878 879 # Load all targets 880 elements, except_elements, track_elements, track_except_elements = \ 881 self._pipeline.load([targets, except_targets, track_targets, track_except_targets], 882 rewritable=rewritable, 883 fetch_subprojects=fetch_subprojects) 884 885 # Hold on to the targets 886 self.targets = elements 887 888 # Here we should raise an error if the track_elements targets 889 # are not dependencies of the primary targets, this is not 890 # supported. 891 # 892 # This can happen with `bst build --track` 893 # 894 if targets and not self._pipeline.targets_include(elements, track_elements): 895 raise StreamError("Specified tracking targets that are not " 896 "within the scope of primary targets") 897 898 # First take care of marking tracking elements, this must be 899 # done before resolving element states. 900 # 901 assert track_selection != PipelineSelection.PLAN 902 903 # Tracked elements are split by owner projects in order to 904 # filter cross junctions tracking dependencies on their 905 # respective project. 906 track_projects = {} 907 for element in track_elements: 908 project = element._get_project() 909 if project not in track_projects: 910 track_projects[project] = [element] 911 else: 912 track_projects[project].append(element) 913 914 track_selected = [] 915 916 for project, project_elements in track_projects.items(): 917 selected = self._pipeline.get_selection(project_elements, track_selection) 918 selected = self._pipeline.track_cross_junction_filter(project, 919 selected, 920 track_cross_junctions) 921 track_selected.extend(selected) 922 923 track_selected = self._pipeline.except_elements(track_elements, 924 track_selected, 925 track_except_elements) 926 927 for element in track_selected: 928 element._schedule_tracking() 929 930 if not targets: 931 self._pipeline.resolve_elements(track_selected) 932 return [], track_selected 933 934 # ArtifactCache.setup_remotes expects all projects to be fully loaded 935 for project in self._context.get_projects(): 936 project.ensure_fully_loaded() 937 938 # Connect to remote caches, this needs to be done before resolving element state 939 self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url) 940 941 # Now move on to loading primary selection. 942 # 943 self._pipeline.resolve_elements(elements) 944 selected = self._pipeline.get_selection(elements, selection, silent=False) 945 selected = self._pipeline.except_elements(elements, 946 selected, 947 except_elements) 948 949 # Set the "required" artifacts that should not be removed 950 # while this pipeline is active 951 # 952 # It must include all the artifacts which are required by the 953 # final product. Note that this is a superset of the build plan. 954 # 955 self._artifacts.mark_required_elements(self._pipeline.dependencies(elements, Scope.ALL)) 956 957 if selection == PipelineSelection.PLAN and dynamic_plan: 958 # We use a dynamic build plan, only request artifacts of top-level targets, 959 # others are requested dynamically as needed. 960 # This avoids pulling, fetching, or building unneeded build-only dependencies. 961 for element in elements: 962 element._set_required() 963 else: 964 for element in selected: 965 element._set_required() 966 967 return selected, track_selected 968 969 # _message() 970 # 971 # Local message propagator 972 # 973 def _message(self, message_type, message, **kwargs): 974 args = dict(kwargs) 975 self._context.message( 976 Message(None, message_type, message, **args)) 977 978 # _add_queue() 979 # 980 # Adds a queue to the stream 981 # 982 # Args: 983 # queue (Queue): Queue to add to the pipeline 984 # track (bool): Whether this is the tracking queue 985 # 986 def _add_queue(self, queue, *, track=False): 987 self.queues.append(queue) 988 989 if not (track or self._first_non_track_queue): 990 self._first_non_track_queue = queue 991 992 # _enqueue_plan() 993 # 994 # Enqueues planned elements to the specified queue. 995 # 996 # Args: 997 # plan (list of Element): The list of elements to be enqueued 998 # queue (Queue): The target queue, defaults to the first non-track queue 999 # 1000 def _enqueue_plan(self, plan, *, queue=None): 1001 queue = queue or self._first_non_track_queue 1002 1003 queue.enqueue(plan) 1004 self.session_elements += plan 1005 1006 # _run() 1007 # 1008 # Common function for running the scheduler 1009 # 1010 def _run(self): 1011 1012 # Inform the frontend of the full list of elements 1013 # and the list of elements which will be processed in this run 1014 # 1015 self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL)) 1016 1017 if self._session_start_callback is not None: 1018 self._session_start_callback() 1019 1020 _, status = self._scheduler.run(self.queues) 1021 1022 if status == SchedStatus.ERROR: 1023 raise StreamError() 1024 elif status == SchedStatus.TERMINATED: 1025 raise StreamError(terminated=True) 1026 1027 # _fetch() 1028 # 1029 # Performs the fetch job, the body of this function is here because 1030 # it is shared between a few internals. 1031 # 1032 # Args: 1033 # elements (list of Element): Elements to fetch 1034 # track_elements (list of Element): Elements to track 1035 # 1036 def _fetch(self, elements, *, track_elements=None): 1037 1038 if track_elements is None: 1039 track_elements = [] 1040 1041 # Subtract the track elements from the fetch elements, they will be added separately 1042 fetch_plan = self._pipeline.subtract_elements(elements, track_elements) 1043 1044 # Assert consistency for the fetch elements 1045 self._pipeline.assert_consistent(fetch_plan) 1046 1047 # Filter out elements with cached sources, only from the fetch plan 1048 # let the track plan resolve new refs. 1049 cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED] 1050 fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached) 1051 1052 # Construct queues, enqueue and run 1053 # 1054 track_queue = None 1055 if track_elements: 1056 track_queue = TrackQueue(self._scheduler) 1057 self._add_queue(track_queue, track=True) 1058 self._add_queue(FetchQueue(self._scheduler)) 1059 1060 if track_elements: 1061 self._enqueue_plan(track_elements, queue=track_queue) 1062 self._enqueue_plan(fetch_plan) 1063 self._run() 1064 1065 # Helper function for checkout() 1066 # 1067 def _checkout_hardlinks(self, sandbox_root, directory): 1068 try: 1069 removed = utils.safe_remove(directory) 1070 except OSError as e: 1071 raise StreamError("Failed to remove checkout directory: {}".format(e)) from e 1072 1073 if removed: 1074 # Try a simple rename of the sandbox root; if that 1075 # doesnt cut it, then do the regular link files code path 1076 try: 1077 os.rename(sandbox_root, directory) 1078 except OSError: 1079 os.makedirs(directory, exist_ok=True) 1080 utils.link_files(sandbox_root, directory) 1081 else: 1082 utils.link_files(sandbox_root, directory) 1083 1084 # Add a directory entry deterministically to a tar file 1085 # 1086 # This function takes extra steps to ensure the output is deterministic. 1087 # First, it sorts the results of os.listdir() to ensure the ordering of 1088 # the files in the archive is the same. Second, it sets a fixed 1089 # timestamp for each entry. See also https://bugs.python.org/issue24465. 1090 @staticmethod 1091 def _add_directory_to_tarfile(tf, dir_name, dir_arcname, mtime=0): 1092 for filename in sorted(os.listdir(dir_name)): 1093 name = os.path.join(dir_name, filename) 1094 arcname = os.path.join(dir_arcname, filename) 1095 1096 tarinfo = tf.gettarinfo(name, arcname) 1097 tarinfo.mtime = mtime 1098 1099 if tarinfo.isreg(): 1100 with open(name, "rb") as f: 1101 tf.addfile(tarinfo, f) 1102 elif tarinfo.isdir(): 1103 tf.addfile(tarinfo) 1104 Stream._add_directory_to_tarfile(tf, name, arcname, mtime) 1105 else: 1106 tf.addfile(tarinfo) 1107 1108 # Write the element build script to the given directory 1109 def _write_element_script(self, directory, element): 1110 try: 1111 element._write_script(directory) 1112 except ImplError: 1113 return False 1114 return True 1115 1116 # Write all source elements to the given directory 1117 def _write_element_sources(self, directory, elements): 1118 for element in elements: 1119 source_dir = os.path.join(directory, "source") 1120 element_source_dir = os.path.join(source_dir, element.normal_name) 1121 1122 element._stage_sources_at(element_source_dir) 1123 1124 # Write a master build script to the sandbox 1125 def _write_build_script(self, directory, elements): 1126 1127 module_string = "" 1128 for element in elements: 1129 module_string += shlex.quote(element.normal_name) + " " 1130 1131 script_path = os.path.join(directory, "build.sh") 1132 1133 with open(_site.build_all_template, "r") as f: 1134 script_template = f.read() 1135 1136 with utils.save_file_atomic(script_path, "w") as script: 1137 script.write(script_template.format(modules=module_string)) 1138 1139 os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD) 1140 1141 # Collect the sources in the given sandbox into a tarfile 1142 def _collect_sources(self, directory, tar_name, element_name, compression): 1143 with self._context.timed_activity("Creating tarball {}".format(tar_name)): 1144 if compression == "none": 1145 permissions = "w:" 1146 else: 1147 permissions = "w:" + compression 1148 1149 with tarfile.open(tar_name, permissions) as tar: 1150 tar.add(directory, arcname=element_name) 1151