1defmodule Stream do 2 @moduledoc """ 3 Functions for creating and composing streams. 4 5 Streams are composable, lazy enumerables (for an introduction on 6 enumerables, see the `Enum` module). Any enumerable that generates 7 elements one by one during enumeration is called a stream. For example, 8 Elixir's `Range` is a stream: 9 10 iex> range = 1..5 11 1..5 12 iex> Enum.map(range, &(&1 * 2)) 13 [2, 4, 6, 8, 10] 14 15 In the example above, as we mapped over the range, the elements being 16 enumerated were created one by one, during enumeration. The `Stream` 17 module allows us to map the range, without triggering its enumeration: 18 19 iex> range = 1..3 20 iex> stream = Stream.map(range, &(&1 * 2)) 21 iex> Enum.map(stream, &(&1 + 1)) 22 [3, 5, 7] 23 24 Note that we started with a range and then we created a stream that is 25 meant to multiply each element in the range by 2. At this point, no 26 computation was done. Only when `Enum.map/2` is called we actually 27 enumerate over each element in the range, multiplying it by 2 and adding 1. 28 We say the functions in `Stream` are *lazy* and the functions in `Enum` 29 are *eager*. 30 31 Due to their laziness, streams are useful when working with large 32 (or even infinite) collections. When chaining many operations with `Enum`, 33 intermediate lists are created, while `Stream` creates a recipe of 34 computations that are executed at a later moment. Let's see another 35 example: 36 37 1..3 38 |> Enum.map(&IO.inspect(&1)) 39 |> Enum.map(&(&1 * 2)) 40 |> Enum.map(&IO.inspect(&1)) 41 1 42 2 43 3 44 2 45 4 46 6 47 #=> [2, 4, 6] 48 49 Note that we first printed each element in the list, then multiplied each 50 element by 2 and finally printed each new value. In this example, the list 51 was enumerated three times. Let's see an example with streams: 52 53 stream = 1..3 54 |> Stream.map(&IO.inspect(&1)) 55 |> Stream.map(&(&1 * 2)) 56 |> Stream.map(&IO.inspect(&1)) 57 Enum.to_list(stream) 58 1 59 2 60 2 61 4 62 3 63 6 64 #=> [2, 4, 6] 65 66 Although the end result is the same, the order in which the elements were 67 printed changed! With streams, we print the first element and then print 68 its double. In this example, the list was enumerated just once! 69 70 That's what we meant when we said earlier that streams are composable, 71 lazy enumerables. Note that we could call `Stream.map/2` multiple times, 72 effectively composing the streams and keeping them lazy. The computations 73 are only performed when you call a function from the `Enum` module. 74 75 Like with `Enum`, the functions in this module work in linear time. This 76 means that, the time it takes to perform an operation grows at the same 77 rate as the length of the list. This is expected on operations such as 78 `Stream.map/2`. After all, if we want to traverse every element on a 79 stream, the longer the stream, the more elements we need to traverse, 80 and the longer it will take. 81 82 ## Creating Streams 83 84 There are many functions in Elixir's standard library that return 85 streams, some examples are: 86 87 * `IO.stream/2` - streams input lines, one by one 88 * `URI.query_decoder/1` - decodes a query string, pair by pair 89 90 This module also provides many convenience functions for creating streams, 91 like `Stream.cycle/1`, `Stream.unfold/2`, `Stream.resource/3` and more. 92 93 Note the functions in this module are guaranteed to return enumerables. 94 Since enumerables can have different shapes (structs, anonymous functions, 95 and so on), the functions in this module may return any of those shapes 96 and this may change at any time. For example, a function that today 97 returns an anonymous function may return a struct in future releases. 98 """ 99 100 @doc false 101 defstruct enum: nil, funs: [], accs: [], done: nil 102 103 @type acc :: any 104 @type element :: any 105 106 @typedoc "Zero-based index." 107 @type index :: non_neg_integer 108 109 @type default :: any 110 @type timer :: non_neg_integer | :infinity 111 112 # Require Stream.Reducers and its callbacks 113 require Stream.Reducers, as: R 114 115 defmacrop skip(acc) do 116 {:cont, acc} 117 end 118 119 defmacrop next(fun, entry, acc) do 120 quote(do: unquote(fun).(unquote(entry), unquote(acc))) 121 end 122 123 defmacrop acc(head, state, tail) do 124 quote(do: [unquote(head), unquote(state) | unquote(tail)]) 125 end 126 127 defmacrop next_with_acc(fun, entry, head, state, tail) do 128 quote do 129 {reason, [head | tail]} = unquote(fun).(unquote(entry), [unquote(head) | unquote(tail)]) 130 {reason, [head, unquote(state) | tail]} 131 end 132 end 133 134 ## Transformers 135 136 @doc false 137 @deprecated "Use Stream.chunk_every/2 instead" 138 def chunk(enum, n), do: chunk(enum, n, n, nil) 139 140 @doc false 141 @deprecated "Use Stream.chunk_every/3 instead" 142 def chunk(enum, n, step) do 143 chunk_every(enum, n, step, nil) 144 end 145 146 @doc false 147 @deprecated "Use Stream.chunk_every/4 instead" 148 def chunk(enum, n, step, leftover) 149 when is_integer(n) and n > 0 and is_integer(step) and step > 0 do 150 chunk_every(enum, n, step, leftover || :discard) 151 end 152 153 @doc """ 154 Shortcut to `chunk_every(enum, count, count)`. 155 """ 156 @doc since: "1.5.0" 157 @spec chunk_every(Enumerable.t(), pos_integer) :: Enumerable.t() 158 def chunk_every(enum, count), do: chunk_every(enum, count, count, []) 159 160 @doc """ 161 Streams the enumerable in chunks, containing `count` elements each, 162 where each new chunk starts `step` elements into the enumerable. 163 164 `step` is optional and, if not passed, defaults to `count`, i.e. 165 chunks do not overlap. 166 167 If the last chunk does not have `count` elements to fill the chunk, 168 elements are taken from `leftover` to fill in the chunk. If `leftover` 169 does not have enough elements to fill the chunk, then a partial chunk 170 is returned with less than `count` elements. 171 172 If `:discard` is given in `leftover`, the last chunk is discarded 173 unless it has exactly `count` elements. 174 175 ## Examples 176 177 iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 2) |> Enum.to_list() 178 [[1, 2], [3, 4], [5, 6]] 179 180 iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 2, :discard) |> Enum.to_list() 181 [[1, 2, 3], [3, 4, 5]] 182 183 iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 2, [7]) |> Enum.to_list() 184 [[1, 2, 3], [3, 4, 5], [5, 6, 7]] 185 186 iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 3, []) |> Enum.to_list() 187 [[1, 2, 3], [4, 5, 6]] 188 189 """ 190 @doc since: "1.5.0" 191 @spec chunk_every(Enumerable.t(), pos_integer, pos_integer, Enumerable.t() | :discard) :: 192 Enumerable.t() 193 def chunk_every(enum, count, step, leftover \\ []) 194 when is_integer(count) and count > 0 and is_integer(step) and step > 0 do 195 R.chunk_every(&chunk_while/4, enum, count, step, leftover) 196 end 197 198 @doc """ 199 Chunks the `enum` by buffering elements for which `fun` returns the same value. 200 201 Elements are only emitted when `fun` returns a new value or the `enum` finishes. 202 203 ## Examples 204 205 iex> stream = Stream.chunk_by([1, 2, 2, 3, 4, 4, 6, 7, 7], &(rem(&1, 2) == 1)) 206 iex> Enum.to_list(stream) 207 [[1], [2, 2], [3], [4, 4, 6], [7, 7]] 208 209 """ 210 @spec chunk_by(Enumerable.t(), (element -> any)) :: Enumerable.t() 211 def chunk_by(enum, fun) when is_function(fun, 1) do 212 R.chunk_by(&chunk_while/4, enum, fun) 213 end 214 215 @doc """ 216 Chunks the `enum` with fine grained control when every chunk is emitted. 217 218 `chunk_fun` receives the current element and the accumulator and 219 must return `{:cont, element, acc}` to emit the given chunk and 220 continue with accumulator or `{:cont, acc}` to not emit any chunk 221 and continue with the return accumulator. 222 223 `after_fun` is invoked when iteration is done and must also return 224 `{:cont, element, acc}` or `{:cont, acc}`. 225 226 ## Examples 227 228 iex> chunk_fun = fn element, acc -> 229 ...> if rem(element, 2) == 0 do 230 ...> {:cont, Enum.reverse([element | acc]), []} 231 ...> else 232 ...> {:cont, [element | acc]} 233 ...> end 234 ...> end 235 iex> after_fun = fn 236 ...> [] -> {:cont, []} 237 ...> acc -> {:cont, Enum.reverse(acc), []} 238 ...> end 239 iex> stream = Stream.chunk_while(1..10, [], chunk_fun, after_fun) 240 iex> Enum.to_list(stream) 241 [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] 242 243 """ 244 @doc since: "1.5.0" 245 @spec chunk_while( 246 Enumerable.t(), 247 acc, 248 (element, acc -> {:cont, chunk, acc} | {:cont, acc} | {:halt, acc}), 249 (acc -> {:cont, chunk, acc} | {:cont, acc}) 250 ) :: Enumerable.t() 251 when chunk: any 252 def chunk_while(enum, acc, chunk_fun, after_fun) 253 when is_function(chunk_fun, 2) and is_function(after_fun, 1) do 254 lazy( 255 enum, 256 [acc | after_fun], 257 fn f1 -> chunk_while_fun(chunk_fun, f1) end, 258 &after_chunk_while/2 259 ) 260 end 261 262 defp chunk_while_fun(callback, fun) do 263 fn entry, acc(head, [acc | after_fun], tail) -> 264 case callback.(entry, acc) do 265 {:cont, emit, acc} -> 266 # If we emit an element and then we have to halt, 267 # we need to disable the after_fun callback to 268 # avoid emitting even more elements. 269 case next(fun, emit, [head | tail]) do 270 {:halt, [head | tail]} -> {:halt, acc(head, [acc | &{:cont, &1}], tail)} 271 {command, [head | tail]} -> {command, acc(head, [acc | after_fun], tail)} 272 end 273 274 {:cont, acc} -> 275 skip(acc(head, [acc | after_fun], tail)) 276 277 {:halt, acc} -> 278 {:halt, acc(head, [acc | after_fun], tail)} 279 end 280 end 281 end 282 283 defp after_chunk_while(acc(h, [acc | after_fun], t), f1) do 284 case after_fun.(acc) do 285 {:cont, emit, acc} -> next_with_acc(f1, emit, h, [acc | after_fun], t) 286 {:cont, acc} -> {:cont, acc(h, [acc | after_fun], t)} 287 end 288 end 289 290 @doc """ 291 Creates a stream that only emits elements if they are different from the last emitted element. 292 293 This function only ever needs to store the last emitted element. 294 295 Elements are compared using `===/2`. 296 297 ## Examples 298 299 iex> Stream.dedup([1, 2, 3, 3, 2, 1]) |> Enum.to_list() 300 [1, 2, 3, 2, 1] 301 302 """ 303 @spec dedup(Enumerable.t()) :: Enumerable.t() 304 def dedup(enum) do 305 dedup_by(enum, fn x -> x end) 306 end 307 308 @doc """ 309 Creates a stream that only emits elements if the result of calling `fun` on the element is 310 different from the (stored) result of calling `fun` on the last emitted element. 311 312 ## Examples 313 314 iex> Stream.dedup_by([{1, :x}, {2, :y}, {2, :z}, {1, :x}], fn {x, _} -> x end) |> Enum.to_list() 315 [{1, :x}, {2, :y}, {1, :x}] 316 317 """ 318 @spec dedup_by(Enumerable.t(), (element -> term)) :: Enumerable.t() 319 def dedup_by(enum, fun) when is_function(fun, 1) do 320 lazy(enum, nil, fn f1 -> R.dedup(fun, f1) end) 321 end 322 323 @doc """ 324 Lazily drops the next `n` elements from the enumerable. 325 326 If a negative `n` is given, it will drop the last `n` elements from 327 the collection. Note that the mechanism by which this is implemented 328 will delay the emission of any element until `n` additional elements have 329 been emitted by the enum. 330 331 ## Examples 332 333 iex> stream = Stream.drop(1..10, 5) 334 iex> Enum.to_list(stream) 335 [6, 7, 8, 9, 10] 336 337 iex> stream = Stream.drop(1..10, -5) 338 iex> Enum.to_list(stream) 339 [1, 2, 3, 4, 5] 340 341 """ 342 @spec drop(Enumerable.t(), integer) :: Enumerable.t() 343 def drop(enum, n) when is_integer(n) and n >= 0 do 344 lazy(enum, n, fn f1 -> R.drop(f1) end) 345 end 346 347 def drop(enum, n) when is_integer(n) and n < 0 do 348 n = abs(n) 349 350 lazy(enum, {0, [], []}, fn f1 -> 351 fn 352 entry, [h, {count, buf1, []} | t] -> 353 do_drop(:cont, n, entry, h, count, buf1, [], t) 354 355 entry, [h, {count, buf1, [next | buf2]} | t] -> 356 {reason, [h | t]} = f1.(next, [h | t]) 357 do_drop(reason, n, entry, h, count, buf1, buf2, t) 358 end 359 end) 360 end 361 362 defp do_drop(reason, n, entry, h, count, buf1, buf2, t) do 363 buf1 = [entry | buf1] 364 count = count + 1 365 366 if count == n do 367 {reason, [h, {0, [], :lists.reverse(buf1)} | t]} 368 else 369 {reason, [h, {count, buf1, buf2} | t]} 370 end 371 end 372 373 @doc """ 374 Creates a stream that drops every `nth` element from the enumerable. 375 376 The first element is always dropped, unless `nth` is 0. 377 378 `nth` must be a non-negative integer. 379 380 ## Examples 381 382 iex> stream = Stream.drop_every(1..10, 2) 383 iex> Enum.to_list(stream) 384 [2, 4, 6, 8, 10] 385 386 iex> stream = Stream.drop_every(1..1000, 1) 387 iex> Enum.to_list(stream) 388 [] 389 390 iex> stream = Stream.drop_every([1, 2, 3, 4, 5], 0) 391 iex> Enum.to_list(stream) 392 [1, 2, 3, 4, 5] 393 394 """ 395 @spec drop_every(Enumerable.t(), non_neg_integer) :: Enumerable.t() 396 def drop_every(enum, nth) 397 def drop_every(enum, 0), do: %Stream{enum: enum} 398 def drop_every([], _nth), do: %Stream{enum: []} 399 400 def drop_every(enum, nth) when is_integer(nth) and nth > 0 do 401 lazy(enum, nth, fn f1 -> R.drop_every(nth, f1) end) 402 end 403 404 @doc """ 405 Lazily drops elements of the enumerable while the given 406 function returns a truthy value. 407 408 ## Examples 409 410 iex> stream = Stream.drop_while(1..10, &(&1 <= 5)) 411 iex> Enum.to_list(stream) 412 [6, 7, 8, 9, 10] 413 414 """ 415 @spec drop_while(Enumerable.t(), (element -> as_boolean(term))) :: Enumerable.t() 416 def drop_while(enum, fun) when is_function(fun, 1) do 417 lazy(enum, true, fn f1 -> R.drop_while(fun, f1) end) 418 end 419 420 @doc """ 421 Executes the given function for each element. 422 423 Useful for adding side effects (like printing) to a stream. 424 425 ## Examples 426 427 iex> stream = Stream.each([1, 2, 3], fn x -> send(self(), x) end) 428 iex> Enum.to_list(stream) 429 iex> receive do: (x when is_integer(x) -> x) 430 1 431 iex> receive do: (x when is_integer(x) -> x) 432 2 433 iex> receive do: (x when is_integer(x) -> x) 434 3 435 436 """ 437 @spec each(Enumerable.t(), (element -> term)) :: Enumerable.t() 438 def each(enum, fun) when is_function(fun, 1) do 439 lazy(enum, fn f1 -> 440 fn x, acc -> 441 fun.(x) 442 f1.(x, acc) 443 end 444 end) 445 end 446 447 @doc """ 448 Maps the given `fun` over `enumerable` and flattens the result. 449 450 This function returns a new stream built by appending the result of invoking `fun` 451 on each element of `enumerable` together. 452 453 ## Examples 454 455 iex> stream = Stream.flat_map([1, 2, 3], fn x -> [x, x * 2] end) 456 iex> Enum.to_list(stream) 457 [1, 2, 2, 4, 3, 6] 458 459 iex> stream = Stream.flat_map([1, 2, 3], fn x -> [[x]] end) 460 iex> Enum.to_list(stream) 461 [[1], [2], [3]] 462 463 """ 464 @spec flat_map(Enumerable.t(), (element -> Enumerable.t())) :: Enumerable.t() 465 def flat_map(enum, mapper) when is_function(mapper, 1) do 466 transform(enum, nil, fn val, nil -> {mapper.(val), nil} end) 467 end 468 469 @doc """ 470 Creates a stream that filters elements according to 471 the given function on enumeration. 472 473 ## Examples 474 475 iex> stream = Stream.filter([1, 2, 3], fn x -> rem(x, 2) == 0 end) 476 iex> Enum.to_list(stream) 477 [2] 478 479 """ 480 @spec filter(Enumerable.t(), (element -> as_boolean(term))) :: Enumerable.t() 481 def filter(enum, fun) when is_function(fun, 1) do 482 lazy(enum, fn f1 -> R.filter(fun, f1) end) 483 end 484 485 @doc false 486 @deprecated "Use Stream.filter/2 + Stream.map/2 instead" 487 def filter_map(enum, filter, mapper) do 488 lazy(enum, fn f1 -> R.filter_map(filter, mapper, f1) end) 489 end 490 491 @doc """ 492 Creates a stream that emits a value after the given period `n` 493 in milliseconds. 494 495 The values emitted are an increasing counter starting at `0`. 496 This operation will block the caller by the given interval 497 every time a new element is streamed. 498 499 Do not use this function to generate a sequence of numbers. 500 If blocking the caller process is not necessary, use 501 `Stream.iterate(0, & &1 + 1)` instead. 502 503 ## Examples 504 505 iex> Stream.interval(10) |> Enum.take(10) 506 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 507 508 """ 509 @spec interval(timer()) :: Enumerable.t() 510 def interval(n) 511 when is_integer(n) and n >= 0 512 when n == :infinity do 513 unfold(0, fn count -> 514 Process.sleep(n) 515 {count, count + 1} 516 end) 517 end 518 519 @doc """ 520 Injects the stream values into the given collectable as a side-effect. 521 522 This function is often used with `run/1` since any evaluation 523 is delayed until the stream is executed. See `run/1` for an example. 524 """ 525 @spec into(Enumerable.t(), Collectable.t(), (term -> term)) :: Enumerable.t() 526 def into(enum, collectable, transform \\ fn x -> x end) when is_function(transform, 1) do 527 &do_into(enum, collectable, transform, &1, &2) 528 end 529 530 defp do_into(enum, collectable, transform, acc, fun) do 531 {initial, into} = Collectable.into(collectable) 532 533 composed = fn x, [acc | collectable] -> 534 collectable = into.(collectable, {:cont, transform.(x)}) 535 {reason, acc} = fun.(x, acc) 536 {reason, [acc | collectable]} 537 end 538 539 do_into(&Enumerable.reduce(enum, &1, composed), initial, into, acc) 540 end 541 542 defp do_into(reduce, collectable, into, {command, acc}) do 543 try do 544 reduce.({command, [acc | collectable]}) 545 catch 546 kind, reason -> 547 into.(collectable, :halt) 548 :erlang.raise(kind, reason, __STACKTRACE__) 549 else 550 {:suspended, [acc | collectable], continuation} -> 551 {:suspended, acc, &do_into(continuation, collectable, into, &1)} 552 553 {reason, [acc | collectable]} -> 554 into.(collectable, :done) 555 {reason, acc} 556 end 557 end 558 559 @doc """ 560 Creates a stream that will apply the given function on 561 enumeration. 562 563 ## Examples 564 565 iex> stream = Stream.map([1, 2, 3], fn x -> x * 2 end) 566 iex> Enum.to_list(stream) 567 [2, 4, 6] 568 569 """ 570 @spec map(Enumerable.t(), (element -> any)) :: Enumerable.t() 571 def map(enum, fun) when is_function(fun, 1) do 572 lazy(enum, fn f1 -> R.map(fun, f1) end) 573 end 574 575 @doc """ 576 Creates a stream that will apply the given function on 577 every `nth` element from the enumerable. 578 579 The first element is always passed to the given function. 580 581 `nth` must be a non-negative integer. 582 583 ## Examples 584 585 iex> stream = Stream.map_every(1..10, 2, fn x -> x * 2 end) 586 iex> Enum.to_list(stream) 587 [2, 2, 6, 4, 10, 6, 14, 8, 18, 10] 588 589 iex> stream = Stream.map_every([1, 2, 3, 4, 5], 1, fn x -> x * 2 end) 590 iex> Enum.to_list(stream) 591 [2, 4, 6, 8, 10] 592 593 iex> stream = Stream.map_every(1..5, 0, fn x -> x * 2 end) 594 iex> Enum.to_list(stream) 595 [1, 2, 3, 4, 5] 596 597 """ 598 @doc since: "1.4.0" 599 @spec map_every(Enumerable.t(), non_neg_integer, (element -> any)) :: Enumerable.t() 600 def map_every(enum, nth, fun) when is_integer(nth) and nth >= 0 and is_function(fun, 1) do 601 map_every_after_guards(enum, nth, fun) 602 end 603 604 defp map_every_after_guards(enum, 1, fun), do: map(enum, fun) 605 defp map_every_after_guards(enum, 0, _fun), do: %Stream{enum: enum} 606 defp map_every_after_guards([], _nth, _fun), do: %Stream{enum: []} 607 608 defp map_every_after_guards(enum, nth, fun) do 609 lazy(enum, nth, fn f1 -> R.map_every(nth, fun, f1) end) 610 end 611 612 @doc """ 613 Creates a stream that will reject elements according to 614 the given function on enumeration. 615 616 ## Examples 617 618 iex> stream = Stream.reject([1, 2, 3], fn x -> rem(x, 2) == 0 end) 619 iex> Enum.to_list(stream) 620 [1, 3] 621 622 """ 623 @spec reject(Enumerable.t(), (element -> as_boolean(term))) :: Enumerable.t() 624 def reject(enum, fun) when is_function(fun, 1) do 625 lazy(enum, fn f1 -> R.reject(fun, f1) end) 626 end 627 628 @doc """ 629 Runs the given stream. 630 631 This is useful when a stream needs to be run, for side effects, 632 and there is no interest in its return result. 633 634 ## Examples 635 636 Open up a file, replace all `#` by `%` and stream to another file 637 without loading the whole file in memory: 638 639 File.stream!("/path/to/file") 640 |> Stream.map(&String.replace(&1, "#", "%")) 641 |> Stream.into(File.stream!("/path/to/other/file")) 642 |> Stream.run() 643 644 No computation will be done until we call one of the `Enum` functions 645 or `run/1`. 646 """ 647 @spec run(Enumerable.t()) :: :ok 648 def run(stream) do 649 _ = Enumerable.reduce(stream, {:cont, nil}, fn _, _ -> {:cont, nil} end) 650 :ok 651 end 652 653 @doc """ 654 Creates a stream that applies the given function to each 655 element, emits the result and uses the same result as the accumulator 656 for the next computation. Uses the first element in the enumerable 657 as the starting value. 658 659 ## Examples 660 661 iex> stream = Stream.scan(1..5, &(&1 + &2)) 662 iex> Enum.to_list(stream) 663 [1, 3, 6, 10, 15] 664 665 """ 666 @spec scan(Enumerable.t(), (element, acc -> any)) :: Enumerable.t() 667 def scan(enum, fun) when is_function(fun, 2) do 668 lazy(enum, :first, fn f1 -> R.scan2(fun, f1) end) 669 end 670 671 @doc """ 672 Creates a stream that applies the given function to each 673 element, emits the result and uses the same result as the accumulator 674 for the next computation. Uses the given `acc` as the starting value. 675 676 ## Examples 677 678 iex> stream = Stream.scan(1..5, 0, &(&1 + &2)) 679 iex> Enum.to_list(stream) 680 [1, 3, 6, 10, 15] 681 682 """ 683 @spec scan(Enumerable.t(), acc, (element, acc -> any)) :: Enumerable.t() 684 def scan(enum, acc, fun) when is_function(fun, 2) do 685 lazy(enum, acc, fn f1 -> R.scan3(fun, f1) end) 686 end 687 688 @doc """ 689 Lazily takes the next `count` elements from the enumerable and stops 690 enumeration. 691 692 If a negative `count` is given, the last `count` values will be taken. 693 For such, the collection is fully enumerated keeping up to `2 * count` 694 elements in memory. Once the end of the collection is reached, 695 the last `count` elements will be executed. Therefore, using 696 a negative `count` on an infinite collection will never return. 697 698 ## Examples 699 700 iex> stream = Stream.take(1..100, 5) 701 iex> Enum.to_list(stream) 702 [1, 2, 3, 4, 5] 703 704 iex> stream = Stream.take(1..100, -5) 705 iex> Enum.to_list(stream) 706 [96, 97, 98, 99, 100] 707 708 iex> stream = Stream.cycle([1, 2, 3]) |> Stream.take(5) 709 iex> Enum.to_list(stream) 710 [1, 2, 3, 1, 2] 711 712 """ 713 @spec take(Enumerable.t(), integer) :: Enumerable.t() 714 def take(enum, count) when is_integer(count) do 715 take_after_guards(enum, count) 716 end 717 718 defp take_after_guards(_enum, 0), do: %Stream{enum: []} 719 720 defp take_after_guards([], _count), do: %Stream{enum: []} 721 722 defp take_after_guards(enum, count) when count > 0 do 723 lazy(enum, count, fn f1 -> R.take(f1) end) 724 end 725 726 defp take_after_guards(enum, count) when count < 0 do 727 &Enumerable.reduce(Enum.take(enum, count), &1, &2) 728 end 729 730 @doc """ 731 Creates a stream that takes every `nth` element from the enumerable. 732 733 The first element is always included, unless `nth` is 0. 734 735 `nth` must be a non-negative integer. 736 737 ## Examples 738 739 iex> stream = Stream.take_every(1..10, 2) 740 iex> Enum.to_list(stream) 741 [1, 3, 5, 7, 9] 742 743 iex> stream = Stream.take_every([1, 2, 3, 4, 5], 1) 744 iex> Enum.to_list(stream) 745 [1, 2, 3, 4, 5] 746 747 iex> stream = Stream.take_every(1..1000, 0) 748 iex> Enum.to_list(stream) 749 [] 750 751 """ 752 @spec take_every(Enumerable.t(), non_neg_integer) :: Enumerable.t() 753 def take_every(enum, nth) when is_integer(nth) and nth >= 0 do 754 take_every_after_guards(enum, nth) 755 end 756 757 defp take_every_after_guards(_enum, 0), do: %Stream{enum: []} 758 759 defp take_every_after_guards([], _nth), do: %Stream{enum: []} 760 761 defp take_every_after_guards(enum, nth) do 762 lazy(enum, nth, fn f1 -> R.take_every(nth, f1) end) 763 end 764 765 @doc """ 766 Lazily takes elements of the enumerable while the given 767 function returns a truthy value. 768 769 ## Examples 770 771 iex> stream = Stream.take_while(1..100, &(&1 <= 5)) 772 iex> Enum.to_list(stream) 773 [1, 2, 3, 4, 5] 774 775 """ 776 @spec take_while(Enumerable.t(), (element -> as_boolean(term))) :: Enumerable.t() 777 def take_while(enum, fun) when is_function(fun, 1) do 778 lazy(enum, fn f1 -> R.take_while(fun, f1) end) 779 end 780 781 @doc """ 782 Creates a stream that emits a single value after `n` milliseconds. 783 784 The value emitted is `0`. This operation will block the caller by 785 the given time until the element is streamed. 786 787 ## Examples 788 789 iex> Stream.timer(10) |> Enum.to_list() 790 [0] 791 792 """ 793 @spec timer(timer()) :: Enumerable.t() 794 def timer(n) 795 when is_integer(n) and n >= 0 796 when n == :infinity do 797 take(interval(n), 1) 798 end 799 800 @doc """ 801 Transforms an existing stream. 802 803 It expects an accumulator and a function that receives each stream element 804 and an accumulator. It must return a tuple, where the first element is a new 805 stream (often a list) or the atom `:halt`, and the second element is the 806 accumulator to be used by the next element, if any, in both cases. 807 808 Note: this function is equivalent to `Enum.flat_map_reduce/3`, except this 809 function does not return the accumulator once the stream is processed. 810 811 ## Examples 812 813 `Stream.transform/3` is useful as it can be used as the basis to implement 814 many of the functions defined in this module. For example, we can implement 815 `Stream.take(enum, n)` as follows: 816 817 iex> enum = 1001..9999 818 iex> n = 3 819 iex> stream = Stream.transform(enum, 0, fn i, acc -> 820 ...> if acc < n, do: {[i], acc + 1}, else: {:halt, acc} 821 ...> end) 822 iex> Enum.to_list(stream) 823 [1001, 1002, 1003] 824 825 """ 826 @spec transform(Enumerable.t(), acc, fun) :: Enumerable.t() 827 when fun: (element, acc -> {Enumerable.t(), acc} | {:halt, acc}), 828 acc: any 829 def transform(enum, acc, reducer) when is_function(reducer, 2) do 830 &do_transform(enum, fn -> acc end, reducer, &1, &2, nil) 831 end 832 833 @doc """ 834 Transforms an existing stream with function-based start and finish. 835 836 The accumulator is only calculated when transformation starts. It also 837 allows an after function to be given which is invoked when the stream 838 halts or completes. 839 840 This function can be seen as a combination of `Stream.resource/3` with 841 `Stream.transform/3`. 842 """ 843 @spec transform(Enumerable.t(), (() -> acc), fun, (acc -> term)) :: Enumerable.t() 844 when fun: (element, acc -> {Enumerable.t(), acc} | {:halt, acc}), 845 acc: any 846 def transform(enum, start_fun, reducer, after_fun) 847 when is_function(start_fun, 0) and is_function(reducer, 2) and is_function(after_fun, 1) do 848 &do_transform(enum, start_fun, reducer, &1, &2, after_fun) 849 end 850 851 defp do_transform(enumerables, user_acc, user, inner_acc, fun, after_fun) do 852 inner = &do_transform_each(&1, &2, fun) 853 step = &do_transform_step(&1, &2) 854 next = &Enumerable.reduce(enumerables, &1, step) 855 funs = {user, fun, inner, after_fun} 856 do_transform(user_acc.(), :cont, next, inner_acc, funs) 857 end 858 859 defp do_transform(user_acc, _next_op, next, {:halt, inner_acc}, funs) do 860 {_, _, _, after_fun} = funs 861 next.({:halt, []}) 862 do_after(after_fun, user_acc) 863 {:halted, inner_acc} 864 end 865 866 defp do_transform(user_acc, next_op, next, {:suspend, inner_acc}, funs) do 867 {:suspended, inner_acc, &do_transform(user_acc, next_op, next, &1, funs)} 868 end 869 870 defp do_transform(user_acc, :halt, _next, {_, inner_acc}, funs) do 871 {_, _, _, after_fun} = funs 872 do_after(after_fun, user_acc) 873 {:halted, inner_acc} 874 end 875 876 defp do_transform(user_acc, :cont, next, inner_acc, funs) do 877 {_, _, _, after_fun} = funs 878 879 try do 880 next.({:cont, []}) 881 catch 882 kind, reason -> 883 do_after(after_fun, user_acc) 884 :erlang.raise(kind, reason, __STACKTRACE__) 885 else 886 {:suspended, vals, next} -> 887 do_transform_user(:lists.reverse(vals), user_acc, :cont, next, inner_acc, funs) 888 889 {_, vals} -> 890 do_transform_user(:lists.reverse(vals), user_acc, :halt, next, inner_acc, funs) 891 end 892 end 893 894 defp do_transform_user([], user_acc, next_op, next, inner_acc, funs) do 895 do_transform(user_acc, next_op, next, inner_acc, funs) 896 end 897 898 defp do_transform_user([val | vals], user_acc, next_op, next, inner_acc, funs) do 899 {user, fun, inner, after_fun} = funs 900 901 try do 902 user.(val, user_acc) 903 catch 904 kind, reason -> 905 next.({:halt, []}) 906 do_after(after_fun, user_acc) 907 :erlang.raise(kind, reason, __STACKTRACE__) 908 else 909 {[], user_acc} -> 910 do_transform_user(vals, user_acc, next_op, next, inner_acc, funs) 911 912 {list, user_acc} when is_list(list) -> 913 reduce = &Enumerable.List.reduce(list, &1, fun) 914 do_list_transform(vals, user_acc, next_op, next, inner_acc, reduce, funs) 915 916 {:halt, user_acc} -> 917 next.({:halt, []}) 918 do_after(after_fun, user_acc) 919 {:halted, elem(inner_acc, 1)} 920 921 {other, user_acc} -> 922 reduce = &Enumerable.reduce(other, &1, inner) 923 do_enum_transform(vals, user_acc, next_op, next, inner_acc, reduce, funs) 924 end 925 end 926 927 defp do_list_transform(vals, user_acc, next_op, next, inner_acc, reduce, funs) do 928 {_, _, _, after_fun} = funs 929 930 try do 931 reduce.(inner_acc) 932 catch 933 kind, reason -> 934 next.({:halt, []}) 935 do_after(after_fun, user_acc) 936 :erlang.raise(kind, reason, __STACKTRACE__) 937 else 938 {:done, acc} -> 939 do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs) 940 941 {:halted, acc} -> 942 next.({:halt, []}) 943 do_after(after_fun, user_acc) 944 {:halted, acc} 945 946 {:suspended, acc, continuation} -> 947 resume = &do_list_transform(vals, user_acc, next_op, next, &1, continuation, funs) 948 {:suspended, acc, resume} 949 end 950 end 951 952 defp do_enum_transform(vals, user_acc, next_op, next, {op, inner_acc}, reduce, funs) do 953 {_, _, _, after_fun} = funs 954 955 try do 956 reduce.({op, [:outer | inner_acc]}) 957 catch 958 kind, reason -> 959 next.({:halt, []}) 960 do_after(after_fun, user_acc) 961 :erlang.raise(kind, reason, __STACKTRACE__) 962 else 963 # Only take into account outer halts when the op is not halt itself. 964 # Otherwise, we were the ones wishing to halt, so we should just stop. 965 {:halted, [:outer | acc]} when op != :halt -> 966 do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs) 967 968 {:halted, [_ | acc]} -> 969 next.({:halt, []}) 970 do_after(after_fun, user_acc) 971 {:halted, acc} 972 973 {:done, [_ | acc]} -> 974 do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs) 975 976 {:suspended, [_ | acc], continuation} -> 977 resume = &do_enum_transform(vals, user_acc, next_op, next, &1, continuation, funs) 978 {:suspended, acc, resume} 979 end 980 end 981 982 defp do_after(nil, _user_acc), do: :ok 983 defp do_after(fun, user_acc), do: fun.(user_acc) 984 985 defp do_transform_each(x, [:outer | acc], f) do 986 case f.(x, acc) do 987 {:halt, res} -> {:halt, [:inner | res]} 988 {op, res} -> {op, [:outer | res]} 989 end 990 end 991 992 defp do_transform_step(x, acc) do 993 {:suspend, [x | acc]} 994 end 995 996 @doc """ 997 Creates a stream that only emits elements if they are unique. 998 999 Keep in mind that, in order to know if an element is unique 1000 or not, this function needs to store all unique values emitted 1001 by the stream. Therefore, if the stream is infinite, the number 1002 of elements stored will grow infinitely, never being garbage-collected. 1003 1004 ## Examples 1005 1006 iex> Stream.uniq([1, 2, 3, 3, 2, 1]) |> Enum.to_list() 1007 [1, 2, 3] 1008 1009 """ 1010 @spec uniq(Enumerable.t()) :: Enumerable.t() 1011 def uniq(enum) do 1012 uniq_by(enum, fn x -> x end) 1013 end 1014 1015 @doc false 1016 @deprecated "Use Stream.uniq_by/2 instead" 1017 def uniq(enum, fun) do 1018 uniq_by(enum, fun) 1019 end 1020 1021 @doc """ 1022 Creates a stream that only emits elements if they are unique, by removing the 1023 elements for which function `fun` returned duplicate elements. 1024 1025 The function `fun` maps every element to a term which is used to 1026 determine if two elements are duplicates. 1027 1028 Keep in mind that, in order to know if an element is unique 1029 or not, this function needs to store all unique values emitted 1030 by the stream. Therefore, if the stream is infinite, the number 1031 of elements stored will grow infinitely, never being garbage-collected. 1032 1033 ## Example 1034 1035 iex> Stream.uniq_by([{1, :x}, {2, :y}, {1, :z}], fn {x, _} -> x end) |> Enum.to_list() 1036 [{1, :x}, {2, :y}] 1037 1038 iex> Stream.uniq_by([a: {:tea, 2}, b: {:tea, 2}, c: {:coffee, 1}], fn {_, y} -> y end) |> Enum.to_list() 1039 [a: {:tea, 2}, c: {:coffee, 1}] 1040 1041 """ 1042 @spec uniq_by(Enumerable.t(), (element -> term)) :: Enumerable.t() 1043 def uniq_by(enum, fun) when is_function(fun, 1) do 1044 lazy(enum, %{}, fn f1 -> R.uniq_by(fun, f1) end) 1045 end 1046 1047 @doc """ 1048 Creates a stream where each element in the enumerable will 1049 be wrapped in a tuple alongside its index. 1050 1051 If an `offset` is given, we will index from the given offset instead of from zero. 1052 1053 ## Examples 1054 1055 iex> stream = Stream.with_index([1, 2, 3]) 1056 iex> Enum.to_list(stream) 1057 [{1, 0}, {2, 1}, {3, 2}] 1058 1059 iex> stream = Stream.with_index([1, 2, 3], 3) 1060 iex> Enum.to_list(stream) 1061 [{1, 3}, {2, 4}, {3, 5}] 1062 1063 """ 1064 @spec with_index(Enumerable.t(), integer) :: Enumerable.t() 1065 def with_index(enum, offset \\ 0) when is_integer(offset) do 1066 lazy(enum, offset, fn f1 -> R.with_index(f1) end) 1067 end 1068 1069 ## Combiners 1070 1071 @doc """ 1072 Creates a stream that enumerates each enumerable in an enumerable. 1073 1074 ## Examples 1075 1076 iex> stream = Stream.concat([1..3, 4..6, 7..9]) 1077 iex> Enum.to_list(stream) 1078 [1, 2, 3, 4, 5, 6, 7, 8, 9] 1079 1080 """ 1081 @spec concat(Enumerable.t()) :: Enumerable.t() 1082 def concat(enumerables) do 1083 flat_map(enumerables, & &1) 1084 end 1085 1086 @doc """ 1087 Creates a stream that enumerates the first argument, followed by the second. 1088 1089 ## Examples 1090 1091 iex> stream = Stream.concat(1..3, 4..6) 1092 iex> Enum.to_list(stream) 1093 [1, 2, 3, 4, 5, 6] 1094 1095 iex> stream1 = Stream.cycle([1, 2, 3]) 1096 iex> stream2 = Stream.cycle([4, 5, 6]) 1097 iex> stream = Stream.concat(stream1, stream2) 1098 iex> Enum.take(stream, 6) 1099 [1, 2, 3, 1, 2, 3] 1100 1101 """ 1102 @spec concat(Enumerable.t(), Enumerable.t()) :: Enumerable.t() 1103 def concat(first, second) do 1104 flat_map([first, second], & &1) 1105 end 1106 1107 @doc """ 1108 Zips two enumerables together, lazily. 1109 1110 The zipping finishes as soon as either enumerable completes. 1111 1112 ## Examples 1113 1114 iex> concat = Stream.concat(1..3, 4..6) 1115 iex> cycle = Stream.cycle([:a, :b, :c]) 1116 iex> Stream.zip(concat, cycle) |> Enum.to_list() 1117 [{1, :a}, {2, :b}, {3, :c}, {4, :a}, {5, :b}, {6, :c}] 1118 1119 """ 1120 @spec zip(Enumerable.t(), Enumerable.t()) :: Enumerable.t() 1121 def zip(enumerable1, enumerable2) do 1122 zip_with(enumerable1, enumerable2, fn left, right -> {left, right} end) 1123 end 1124 1125 @doc """ 1126 Zips corresponding elements from a finite collection of enumerables 1127 into one stream of tuples. 1128 1129 The zipping finishes as soon as any enumerable in the given collection completes. 1130 1131 ## Examples 1132 1133 iex> concat = Stream.concat(1..3, 4..6) 1134 iex> cycle = Stream.cycle(["foo", "bar", "baz"]) 1135 iex> Stream.zip([concat, [:a, :b, :c], cycle]) |> Enum.to_list() 1136 [{1, :a, "foo"}, {2, :b, "bar"}, {3, :c, "baz"}] 1137 1138 """ 1139 @doc since: "1.4.0" 1140 @spec zip(enumerables) :: Enumerable.t() when enumerables: [Enumerable.t()] | Enumerable.t() 1141 def zip(enumerables) do 1142 zip_with(enumerables, &List.to_tuple(&1)) 1143 end 1144 1145 @doc """ 1146 Lazily zips corresponding elements from two enumerables into a new one, transforming them with 1147 the `zip_fun` function as it goes. 1148 1149 The `zip_fun` will be called with the first element from `enumerable1` and the first 1150 element from `enumerable2`, then with the second element from each, and so on until 1151 either one of the enumerables completes. 1152 1153 ## Examples 1154 1155 iex> concat = Stream.concat(1..3, 4..6) 1156 iex> Stream.zip_with(concat, concat, fn a, b -> a + b end) |> Enum.to_list() 1157 [2, 4, 6, 8, 10, 12] 1158 1159 """ 1160 @doc since: "1.12.0" 1161 @spec zip_with(Enumerable.t(), Enumerable.t(), (term, term -> term)) :: Enumerable.t() 1162 def zip_with(enumerable1, enumerable2, zip_fun) 1163 when is_list(enumerable1) and is_list(enumerable2) and is_function(zip_fun, 2) do 1164 &zip_pair(enumerable1, enumerable2, &1, &2, zip_fun) 1165 end 1166 1167 def zip_with(enumerable1, enumerable2, zip_fun) when is_function(zip_fun, 2) do 1168 zip_with([enumerable1, enumerable2], fn [left, right] -> zip_fun.(left, right) end) 1169 end 1170 1171 defp zip_pair(_list1, _list2, {:halt, acc}, _fun, _zip_fun) do 1172 {:halted, acc} 1173 end 1174 1175 defp zip_pair(list1, list2, {:suspend, acc}, fun, zip_fun) do 1176 {:suspended, acc, &zip_pair(list1, list2, &1, fun, zip_fun)} 1177 end 1178 1179 defp zip_pair([], _list2, {:cont, acc}, _fun, _zip_fun), do: {:done, acc} 1180 defp zip_pair(_list1, [], {:cont, acc}, _fun, _zip_fun), do: {:done, acc} 1181 1182 defp zip_pair([head1 | tail1], [head2 | tail2], {:cont, acc}, fun, zip_fun) do 1183 zip_pair(tail1, tail2, fun.(zip_fun.(head1, head2), acc), fun, zip_fun) 1184 end 1185 1186 @doc """ 1187 Lazily zips corresponding elements from a finite collection of enumerables into a new 1188 enumerable, transforming them with the `zip_fun` function as it goes. 1189 1190 The first element from each of the enums in `enumerables` will be put into a list which is then passed to 1191 the 1-arity `zip_fun` function. Then, the second elements from each of the enums are put into a list and passed to 1192 `zip_fun`, and so on until any one of the enums in `enumerables` completes. 1193 1194 Returns a new enumerable with the results of calling `zip_fun`. 1195 1196 ## Examples 1197 1198 iex> concat = Stream.concat(1..3, 4..6) 1199 iex> Stream.zip_with([concat, concat], fn [a, b] -> a + b end) |> Enum.to_list() 1200 [2, 4, 6, 8, 10, 12] 1201 1202 iex> concat = Stream.concat(1..3, 4..6) 1203 iex> Stream.zip_with([concat, concat, 1..3], fn [a, b, c] -> a + b + c end) |> Enum.to_list() 1204 [3, 6, 9] 1205 1206 """ 1207 @doc since: "1.12.0" 1208 @spec zip_with(enumerables, (Enumerable.t() -> term)) :: Enumerable.t() 1209 when enumerables: [Enumerable.t()] | Enumerable.t() 1210 def zip_with(enumerables, zip_fun) when is_function(zip_fun, 1) do 1211 if is_list(enumerables) and :lists.all(&is_list/1, enumerables) do 1212 &zip_list(enumerables, &1, &2, zip_fun) 1213 else 1214 &zip_enum(enumerables, &1, &2, zip_fun) 1215 end 1216 end 1217 1218 defp zip_list(_enumerables, {:halt, acc}, _fun, _zip_fun) do 1219 {:halted, acc} 1220 end 1221 1222 defp zip_list(enumerables, {:suspend, acc}, fun, zip_fun) do 1223 {:suspended, acc, &zip_list(enumerables, &1, fun, zip_fun)} 1224 end 1225 1226 defp zip_list(enumerables, {:cont, acc}, fun, zip_fun) do 1227 case zip_list_heads_tails(enumerables, [], []) do 1228 {heads, tails} -> zip_list(tails, fun.(zip_fun.(heads), acc), fun, zip_fun) 1229 :error -> {:done, acc} 1230 end 1231 end 1232 1233 defp zip_list_heads_tails([[head | tail] | rest], heads, tails) do 1234 zip_list_heads_tails(rest, [head | heads], [tail | tails]) 1235 end 1236 1237 defp zip_list_heads_tails([[] | _rest], _heads, _tails) do 1238 :error 1239 end 1240 1241 defp zip_list_heads_tails([], heads, tails) do 1242 {:lists.reverse(heads), :lists.reverse(tails)} 1243 end 1244 1245 defp zip_enum(enumerables, acc, fun, zip_fun) do 1246 step = fn x, acc -> 1247 {:suspend, :lists.reverse([x | acc])} 1248 end 1249 1250 enum_funs = 1251 Enum.map(enumerables, fn enum -> 1252 {&Enumerable.reduce(enum, &1, step), [], :cont} 1253 end) 1254 1255 do_zip_enum(enum_funs, acc, fun, zip_fun) 1256 end 1257 1258 # This implementation of do_zip_enum/4 works for any number of streams to zip 1259 defp do_zip_enum(zips, {:halt, acc}, _fun, _zip_fun) do 1260 do_zip_close(zips) 1261 {:halted, acc} 1262 end 1263 1264 defp do_zip_enum(zips, {:suspend, acc}, fun, zip_fun) do 1265 {:suspended, acc, &do_zip_enum(zips, &1, fun, zip_fun)} 1266 end 1267 1268 defp do_zip_enum([], {:cont, acc}, _callback, _zip_fun) do 1269 {:done, acc} 1270 end 1271 1272 defp do_zip_enum(zips, {:cont, acc}, callback, zip_fun) do 1273 try do 1274 do_zip_next(zips, acc, callback, [], [], zip_fun) 1275 catch 1276 kind, reason -> 1277 do_zip_close(zips) 1278 :erlang.raise(kind, reason, __STACKTRACE__) 1279 else 1280 {:next, buffer, acc} -> 1281 do_zip_enum(buffer, acc, callback, zip_fun) 1282 1283 {:done, _acc} = other -> 1284 other 1285 end 1286 end 1287 1288 # do_zip_next/6 computes the next tuple formed by 1289 # the next element of each zipped stream. 1290 defp do_zip_next( 1291 [{_, [], :halt} | zips], 1292 acc, 1293 _callback, 1294 _yielded_elems, 1295 buffer, 1296 _zip_fun 1297 ) do 1298 do_zip_close(:lists.reverse(buffer, zips)) 1299 {:done, acc} 1300 end 1301 1302 defp do_zip_next([{fun, [], :cont} | zips], acc, callback, yielded_elems, buffer, zip_fun) do 1303 case fun.({:cont, []}) do 1304 {:suspended, [elem | next_acc], fun} -> 1305 next_buffer = [{fun, next_acc, :cont} | buffer] 1306 do_zip_next(zips, acc, callback, [elem | yielded_elems], next_buffer, zip_fun) 1307 1308 {_, [elem | next_acc]} -> 1309 next_buffer = [{fun, next_acc, :halt} | buffer] 1310 do_zip_next(zips, acc, callback, [elem | yielded_elems], next_buffer, zip_fun) 1311 1312 {_, []} -> 1313 # The current zipped stream terminated, so we close all the streams 1314 # and return {:halted, acc} (which is returned as is by do_zip/3). 1315 do_zip_close(:lists.reverse(buffer, zips)) 1316 {:done, acc} 1317 end 1318 end 1319 1320 defp do_zip_next( 1321 [{fun, zip_acc, zip_op} | zips], 1322 acc, 1323 callback, 1324 yielded_elems, 1325 buffer, 1326 zip_fun 1327 ) do 1328 [elem | rest] = zip_acc 1329 next_buffer = [{fun, rest, zip_op} | buffer] 1330 do_zip_next(zips, acc, callback, [elem | yielded_elems], next_buffer, zip_fun) 1331 end 1332 1333 defp do_zip_next([] = _zips, acc, callback, yielded_elems, buffer, zip_fun) do 1334 # "yielded_elems" is a reversed list of results for the current iteration of 1335 # zipping. That is to say, the nth element from each of the enums being zipped. 1336 # It needs to be reversed and passed to the zipping function so it can do it's thing. 1337 {:next, :lists.reverse(buffer), callback.(zip_fun.(:lists.reverse(yielded_elems)), acc)} 1338 end 1339 1340 defp do_zip_close(zips) do 1341 :lists.foreach(fn {fun, _, _} -> fun.({:halt, []}) end, zips) 1342 end 1343 1344 ## Sources 1345 1346 @doc """ 1347 Creates a stream that cycles through the given enumerable, 1348 infinitely. 1349 1350 ## Examples 1351 1352 iex> stream = Stream.cycle([1, 2, 3]) 1353 iex> Enum.take(stream, 5) 1354 [1, 2, 3, 1, 2] 1355 1356 """ 1357 @spec cycle(Enumerable.t()) :: Enumerable.t() 1358 def cycle(enumerable) 1359 1360 def cycle([]) do 1361 raise ArgumentError, "cannot cycle over an empty enumerable" 1362 end 1363 1364 def cycle(enumerable) when is_list(enumerable) do 1365 unfold({enumerable, enumerable}, fn 1366 {source, [h | t]} -> {h, {source, t}} 1367 {source = [h | t], []} -> {h, {source, t}} 1368 end) 1369 end 1370 1371 def cycle(enumerable) do 1372 fn acc, fun -> 1373 step = &do_cycle_step(&1, &2) 1374 cycle = &Enumerable.reduce(enumerable, &1, step) 1375 reduce = check_cycle_first_element(cycle) 1376 do_cycle(reduce, [], cycle, acc, fun) 1377 end 1378 end 1379 1380 defp do_cycle(reduce, inner_acc, _cycle, {:halt, acc}, _fun) do 1381 reduce.({:halt, inner_acc}) 1382 {:halted, acc} 1383 end 1384 1385 defp do_cycle(reduce, inner_acc, cycle, {:suspend, acc}, fun) do 1386 {:suspended, acc, &do_cycle(reduce, inner_acc, cycle, &1, fun)} 1387 end 1388 1389 defp do_cycle(reduce, inner_acc, cycle, {:cont, acc}, fun) do 1390 case reduce.({:cont, inner_acc}) do 1391 {:suspended, [element], new_reduce} -> 1392 do_cycle(new_reduce, inner_acc, cycle, fun.(element, acc), fun) 1393 1394 {_, [element]} -> 1395 do_cycle(cycle, [], cycle, fun.(element, acc), fun) 1396 1397 {_, []} -> 1398 do_cycle(cycle, [], cycle, {:cont, acc}, fun) 1399 end 1400 end 1401 1402 defp do_cycle_step(x, acc) do 1403 {:suspend, [x | acc]} 1404 end 1405 1406 defp check_cycle_first_element(reduce) do 1407 fn acc -> 1408 case reduce.(acc) do 1409 {state, []} when state in [:done, :halted] -> 1410 raise ArgumentError, "cannot cycle over an empty enumerable" 1411 1412 other -> 1413 other 1414 end 1415 end 1416 end 1417 1418 @doc """ 1419 Emits a sequence of values, starting with `start_value`. Successive 1420 values are generated by calling `next_fun` on the previous value. 1421 1422 ## Examples 1423 1424 iex> Stream.iterate(0, &(&1 + 1)) |> Enum.take(5) 1425 [0, 1, 2, 3, 4] 1426 1427 """ 1428 @spec iterate(element, (element -> element)) :: Enumerable.t() 1429 def iterate(start_value, next_fun) when is_function(next_fun, 1) do 1430 unfold({:ok, start_value}, fn 1431 {:ok, value} -> 1432 {value, {:next, value}} 1433 1434 {:next, value} -> 1435 next = next_fun.(value) 1436 {next, {:next, next}} 1437 end) 1438 end 1439 1440 @doc """ 1441 Returns a stream generated by calling `generator_fun` repeatedly. 1442 1443 ## Examples 1444 1445 # Although not necessary, let's seed the random algorithm 1446 iex> :rand.seed(:exsss, {1, 2, 3}) 1447 iex> Stream.repeatedly(&:rand.uniform/0) |> Enum.take(3) 1448 [0.5455598952593053, 0.6039309974353404, 0.6684893034823949] 1449 1450 """ 1451 @spec repeatedly((() -> element)) :: Enumerable.t() 1452 def repeatedly(generator_fun) when is_function(generator_fun, 0) do 1453 &do_repeatedly(generator_fun, &1, &2) 1454 end 1455 1456 defp do_repeatedly(generator_fun, {:suspend, acc}, fun) do 1457 {:suspended, acc, &do_repeatedly(generator_fun, &1, fun)} 1458 end 1459 1460 defp do_repeatedly(_generator_fun, {:halt, acc}, _fun) do 1461 {:halted, acc} 1462 end 1463 1464 defp do_repeatedly(generator_fun, {:cont, acc}, fun) do 1465 do_repeatedly(generator_fun, fun.(generator_fun.(), acc), fun) 1466 end 1467 1468 @doc """ 1469 Emits a sequence of values for the given resource. 1470 1471 Similar to `transform/3` but the initial accumulated value is 1472 computed lazily via `start_fun` and executes an `after_fun` at 1473 the end of enumeration (both in cases of success and failure). 1474 1475 Successive values are generated by calling `next_fun` with the 1476 previous accumulator (the initial value being the result returned 1477 by `start_fun`) and it must return a tuple containing a list 1478 of elements to be emitted and the next accumulator. The enumeration 1479 finishes if it returns `{:halt, acc}`. 1480 1481 As the name says, this function is useful to stream values from 1482 resources. 1483 1484 ## Examples 1485 1486 Stream.resource( 1487 fn -> File.open!("sample") end, 1488 fn file -> 1489 case IO.read(file, :line) do 1490 data when is_binary(data) -> {[data], file} 1491 _ -> {:halt, file} 1492 end 1493 end, 1494 fn file -> File.close(file) end 1495 ) 1496 1497 iex> Stream.resource( 1498 ...> fn -> 1499 ...> {:ok, pid} = StringIO.open("string") 1500 ...> pid 1501 ...> end, 1502 ...> fn pid -> 1503 ...> case IO.getn(pid, "", 1) do 1504 ...> :eof -> {:halt, pid} 1505 ...> char -> {[char], pid} 1506 ...> end 1507 ...> end, 1508 ...> fn pid -> StringIO.close(pid) end 1509 ...> ) |> Enum.to_list() 1510 ["s", "t", "r", "i", "n", "g"] 1511 1512 """ 1513 @spec resource((() -> acc), (acc -> {[element], acc} | {:halt, acc}), (acc -> term)) :: 1514 Enumerable.t() 1515 def resource(start_fun, next_fun, after_fun) 1516 when is_function(start_fun, 0) and is_function(next_fun, 1) and is_function(after_fun, 1) do 1517 &do_resource(start_fun.(), next_fun, &1, &2, after_fun) 1518 end 1519 1520 defp do_resource(next_acc, next_fun, {:suspend, acc}, fun, after_fun) do 1521 {:suspended, acc, &do_resource(next_acc, next_fun, &1, fun, after_fun)} 1522 end 1523 1524 defp do_resource(next_acc, _next_fun, {:halt, acc}, _fun, after_fun) do 1525 after_fun.(next_acc) 1526 {:halted, acc} 1527 end 1528 1529 defp do_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun) do 1530 try do 1531 next_fun.(next_acc) 1532 catch 1533 kind, reason -> 1534 after_fun.(next_acc) 1535 :erlang.raise(kind, reason, __STACKTRACE__) 1536 else 1537 {:halt, next_acc} -> 1538 do_resource(next_acc, next_fun, {:halt, acc}, fun, after_fun) 1539 1540 {[], next_acc} -> 1541 do_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun) 1542 1543 {[v], next_acc} -> 1544 do_element_resource(next_acc, next_fun, acc, fun, after_fun, v) 1545 1546 {list, next_acc} when is_list(list) -> 1547 reduce = &Enumerable.List.reduce(list, &1, fun) 1548 do_list_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun, reduce) 1549 1550 {enum, next_acc} -> 1551 inner = &do_resource_each(&1, &2, fun) 1552 reduce = &Enumerable.reduce(enum, &1, inner) 1553 do_enum_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun, reduce) 1554 end 1555 end 1556 1557 defp do_element_resource(next_acc, next_fun, acc, fun, after_fun, v) do 1558 try do 1559 fun.(v, acc) 1560 catch 1561 kind, reason -> 1562 after_fun.(next_acc) 1563 :erlang.raise(kind, reason, __STACKTRACE__) 1564 else 1565 acc -> 1566 do_resource(next_acc, next_fun, acc, fun, after_fun) 1567 end 1568 end 1569 1570 defp do_list_resource(next_acc, next_fun, acc, fun, after_fun, reduce) do 1571 try do 1572 reduce.(acc) 1573 catch 1574 kind, reason -> 1575 after_fun.(next_acc) 1576 :erlang.raise(kind, reason, __STACKTRACE__) 1577 else 1578 {:done, acc} -> 1579 do_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun) 1580 1581 {:halted, acc} -> 1582 do_resource(next_acc, next_fun, {:halt, acc}, fun, after_fun) 1583 1584 {:suspended, acc, c} -> 1585 {:suspended, acc, &do_list_resource(next_acc, next_fun, &1, fun, after_fun, c)} 1586 end 1587 end 1588 1589 defp do_enum_resource(next_acc, next_fun, {op, acc}, fun, after_fun, reduce) do 1590 try do 1591 reduce.({op, [:outer | acc]}) 1592 catch 1593 kind, reason -> 1594 after_fun.(next_acc) 1595 :erlang.raise(kind, reason, __STACKTRACE__) 1596 else 1597 {:halted, [:outer | acc]} -> 1598 do_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun) 1599 1600 {:halted, [:inner | acc]} -> 1601 do_resource(next_acc, next_fun, {:halt, acc}, fun, after_fun) 1602 1603 {:done, [_ | acc]} -> 1604 do_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun) 1605 1606 {:suspended, [_ | acc], c} -> 1607 {:suspended, acc, &do_enum_resource(next_acc, next_fun, &1, fun, after_fun, c)} 1608 end 1609 end 1610 1611 defp do_resource_each(x, [:outer | acc], f) do 1612 case f.(x, acc) do 1613 {:halt, res} -> {:halt, [:inner | res]} 1614 {op, res} -> {op, [:outer | res]} 1615 end 1616 end 1617 1618 @doc """ 1619 Emits a sequence of values for the given accumulator. 1620 1621 Successive values are generated by calling `next_fun` with the previous 1622 accumulator and it must return a tuple with the current value and next 1623 accumulator. The enumeration finishes if it returns `nil`. 1624 1625 ## Examples 1626 1627 iex> Stream.unfold(5, fn 1628 ...> 0 -> nil 1629 ...> n -> {n, n - 1} 1630 ...> end) |> Enum.to_list() 1631 [5, 4, 3, 2, 1] 1632 1633 """ 1634 @spec unfold(acc, (acc -> {element, acc} | nil)) :: Enumerable.t() 1635 def unfold(next_acc, next_fun) when is_function(next_fun, 1) do 1636 &do_unfold(next_acc, next_fun, &1, &2) 1637 end 1638 1639 defp do_unfold(next_acc, next_fun, {:suspend, acc}, fun) do 1640 {:suspended, acc, &do_unfold(next_acc, next_fun, &1, fun)} 1641 end 1642 1643 defp do_unfold(_next_acc, _next_fun, {:halt, acc}, _fun) do 1644 {:halted, acc} 1645 end 1646 1647 defp do_unfold(next_acc, next_fun, {:cont, acc}, fun) do 1648 case next_fun.(next_acc) do 1649 nil -> {:done, acc} 1650 {v, next_acc} -> do_unfold(next_acc, next_fun, fun.(v, acc), fun) 1651 end 1652 end 1653 1654 @doc """ 1655 Lazily intersperses `intersperse_element` between each element of the enumeration. 1656 1657 ## Examples 1658 1659 iex> Stream.intersperse([1, 2, 3], 0) |> Enum.to_list() 1660 [1, 0, 2, 0, 3] 1661 1662 iex> Stream.intersperse([1], 0) |> Enum.to_list() 1663 [1] 1664 1665 iex> Stream.intersperse([], 0) |> Enum.to_list() 1666 [] 1667 1668 """ 1669 @doc since: "1.6.0" 1670 @spec intersperse(Enumerable.t(), any) :: Enumerable.t() 1671 def intersperse(enumerable, intersperse_element) do 1672 Stream.transform(enumerable, false, fn 1673 element, true -> {[intersperse_element, element], true} 1674 element, false -> {[element], true} 1675 end) 1676 end 1677 1678 ## Helpers 1679 1680 @compile {:inline, lazy: 2, lazy: 3, lazy: 4} 1681 1682 defp lazy(%Stream{done: nil, funs: funs} = lazy, fun), do: %{lazy | funs: [fun | funs]} 1683 defp lazy(enum, fun), do: %Stream{enum: enum, funs: [fun]} 1684 1685 defp lazy(%Stream{done: nil, funs: funs, accs: accs} = lazy, acc, fun), 1686 do: %{lazy | funs: [fun | funs], accs: [acc | accs]} 1687 1688 defp lazy(enum, acc, fun), do: %Stream{enum: enum, funs: [fun], accs: [acc]} 1689 1690 defp lazy(%Stream{done: nil, funs: funs, accs: accs} = lazy, acc, fun, done), 1691 do: %{lazy | funs: [fun | funs], accs: [acc | accs], done: done} 1692 1693 defp lazy(enum, acc, fun, done), do: %Stream{enum: enum, funs: [fun], accs: [acc], done: done} 1694end 1695 1696defimpl Enumerable, for: Stream do 1697 @compile :inline_list_funcs 1698 1699 def count(_lazy), do: {:error, __MODULE__} 1700 1701 def member?(_lazy, _value), do: {:error, __MODULE__} 1702 1703 def slice(_lazy), do: {:error, __MODULE__} 1704 1705 def reduce(lazy, acc, fun) do 1706 do_reduce(lazy, acc, fn x, [acc] -> 1707 {reason, acc} = fun.(x, acc) 1708 {reason, [acc]} 1709 end) 1710 end 1711 1712 defp do_reduce(%Stream{enum: enum, funs: funs, accs: accs, done: done}, acc, fun) do 1713 composed = :lists.foldl(fn entry_fun, acc -> entry_fun.(acc) end, fun, funs) 1714 reduce = &Enumerable.reduce(enum, &1, composed) 1715 do_each(reduce, done && {done, fun}, :lists.reverse(accs), acc) 1716 end 1717 1718 defp do_each(reduce, done, accs, {command, acc}) do 1719 case reduce.({command, [acc | accs]}) do 1720 {:suspended, [acc | accs], continuation} -> 1721 {:suspended, acc, &do_each(continuation, done, accs, &1)} 1722 1723 {:halted, accs} -> 1724 do_done({:halted, accs}, done) 1725 1726 {:done, accs} -> 1727 do_done({:done, accs}, done) 1728 end 1729 end 1730 1731 defp do_done({reason, [acc | _]}, nil), do: {reason, acc} 1732 1733 defp do_done({reason, [acc | t]}, {done, fun}) do 1734 [h | _] = :lists.reverse(t) 1735 1736 case done.([acc, h], fun) do 1737 {:cont, [acc | _]} -> {reason, acc} 1738 {:halt, [acc | _]} -> {:halted, acc} 1739 {:suspend, [acc | _]} -> {:suspended, acc, &{:done, elem(&1, 1)}} 1740 end 1741 end 1742end 1743 1744defimpl Inspect, for: Stream do 1745 import Inspect.Algebra 1746 1747 def inspect(%{enum: enum, funs: funs}, opts) do 1748 inner = [enum: enum, funs: :lists.reverse(funs)] 1749 concat(["#Stream<", to_doc(inner, opts), ">"]) 1750 end 1751end 1752