1defmodule Stream do
2  @moduledoc """
3  Functions for creating and composing streams.
4
5  Streams are composable, lazy enumerables (for an introduction on
6  enumerables, see the `Enum` module). Any enumerable that generates
7  elements one by one during enumeration is called a stream. For example,
8  Elixir's `Range` is a stream:
9
10      iex> range = 1..5
11      1..5
12      iex> Enum.map(range, &(&1 * 2))
13      [2, 4, 6, 8, 10]
14
15  In the example above, as we mapped over the range, the elements being
16  enumerated were created one by one, during enumeration. The `Stream`
17  module allows us to map the range, without triggering its enumeration:
18
19      iex> range = 1..3
20      iex> stream = Stream.map(range, &(&1 * 2))
21      iex> Enum.map(stream, &(&1 + 1))
22      [3, 5, 7]
23
24  Note that we started with a range and then we created a stream that is
25  meant to multiply each element in the range by 2. At this point, no
26  computation was done. Only when `Enum.map/2` is called we actually
27  enumerate over each element in the range, multiplying it by 2 and adding 1.
28  We say the functions in `Stream` are *lazy* and the functions in `Enum`
29  are *eager*.
30
31  Due to their laziness, streams are useful when working with large
32  (or even infinite) collections. When chaining many operations with `Enum`,
33  intermediate lists are created, while `Stream` creates a recipe of
34  computations that are executed at a later moment. Let's see another
35  example:
36
37      1..3
38      |> Enum.map(&IO.inspect(&1))
39      |> Enum.map(&(&1 * 2))
40      |> Enum.map(&IO.inspect(&1))
41      1
42      2
43      3
44      2
45      4
46      6
47      #=> [2, 4, 6]
48
49  Note that we first printed each element in the list, then multiplied each
50  element by 2 and finally printed each new value. In this example, the list
51  was enumerated three times. Let's see an example with streams:
52
53      stream = 1..3
54      |> Stream.map(&IO.inspect(&1))
55      |> Stream.map(&(&1 * 2))
56      |> Stream.map(&IO.inspect(&1))
57      Enum.to_list(stream)
58      1
59      2
60      2
61      4
62      3
63      6
64      #=> [2, 4, 6]
65
66  Although the end result is the same, the order in which the elements were
67  printed changed! With streams, we print the first element and then print
68  its double. In this example, the list was enumerated just once!
69
70  That's what we meant when we said earlier that streams are composable,
71  lazy enumerables. Note that we could call `Stream.map/2` multiple times,
72  effectively composing the streams and keeping them lazy. The computations
73  are only performed when you call a function from the `Enum` module.
74
75  Like with `Enum`, the functions in this module work in linear time. This
76  means that, the time it takes to perform an operation grows at the same
77  rate as the length of the list. This is expected on operations such as
78  `Stream.map/2`. After all, if we want to traverse every element on a
79  stream, the longer the stream, the more elements we need to traverse,
80  and the longer it will take.
81
82  ## Creating Streams
83
84  There are many functions in Elixir's standard library that return
85  streams, some examples are:
86
87    * `IO.stream/2`         - streams input lines, one by one
88    * `URI.query_decoder/1` - decodes a query string, pair by pair
89
90  This module also provides many convenience functions for creating streams,
91  like `Stream.cycle/1`, `Stream.unfold/2`, `Stream.resource/3` and more.
92
93  Note the functions in this module are guaranteed to return enumerables.
94  Since enumerables can have different shapes (structs, anonymous functions,
95  and so on), the functions in this module may return any of those shapes
96  and this may change at any time. For example, a function that today
97  returns an anonymous function may return a struct in future releases.
98  """
99
100  @doc false
101  defstruct enum: nil, funs: [], accs: [], done: nil
102
103  @type acc :: any
104  @type element :: any
105
106  @typedoc "Zero-based index."
107  @type index :: non_neg_integer
108
109  @type default :: any
110  @type timer :: non_neg_integer | :infinity
111
112  # Require Stream.Reducers and its callbacks
113  require Stream.Reducers, as: R
114
115  defmacrop skip(acc) do
116    {:cont, acc}
117  end
118
119  defmacrop next(fun, entry, acc) do
120    quote(do: unquote(fun).(unquote(entry), unquote(acc)))
121  end
122
123  defmacrop acc(head, state, tail) do
124    quote(do: [unquote(head), unquote(state) | unquote(tail)])
125  end
126
127  defmacrop next_with_acc(fun, entry, head, state, tail) do
128    quote do
129      {reason, [head | tail]} = unquote(fun).(unquote(entry), [unquote(head) | unquote(tail)])
130      {reason, [head, unquote(state) | tail]}
131    end
132  end
133
134  ## Transformers
135
136  @doc false
137  @deprecated "Use Stream.chunk_every/2 instead"
138  def chunk(enum, n), do: chunk(enum, n, n, nil)
139
140  @doc false
141  @deprecated "Use Stream.chunk_every/3 instead"
142  def chunk(enum, n, step) do
143    chunk_every(enum, n, step, nil)
144  end
145
146  @doc false
147  @deprecated "Use Stream.chunk_every/4 instead"
148  def chunk(enum, n, step, leftover)
149      when is_integer(n) and n > 0 and is_integer(step) and step > 0 do
150    chunk_every(enum, n, step, leftover || :discard)
151  end
152
153  @doc """
154  Shortcut to `chunk_every(enum, count, count)`.
155  """
156  @doc since: "1.5.0"
157  @spec chunk_every(Enumerable.t(), pos_integer) :: Enumerable.t()
158  def chunk_every(enum, count), do: chunk_every(enum, count, count, [])
159
160  @doc """
161  Streams the enumerable in chunks, containing `count` elements each,
162  where each new chunk starts `step` elements into the enumerable.
163
164  `step` is optional and, if not passed, defaults to `count`, i.e.
165  chunks do not overlap.
166
167  If the last chunk does not have `count` elements to fill the chunk,
168  elements are taken from `leftover` to fill in the chunk. If `leftover`
169  does not have enough elements to fill the chunk, then a partial chunk
170  is returned with less than `count` elements.
171
172  If `:discard` is given in `leftover`, the last chunk is discarded
173  unless it has exactly `count` elements.
174
175  ## Examples
176
177      iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 2) |> Enum.to_list()
178      [[1, 2], [3, 4], [5, 6]]
179
180      iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 2, :discard) |> Enum.to_list()
181      [[1, 2, 3], [3, 4, 5]]
182
183      iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 2, [7]) |> Enum.to_list()
184      [[1, 2, 3], [3, 4, 5], [5, 6, 7]]
185
186      iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 3, []) |> Enum.to_list()
187      [[1, 2, 3], [4, 5, 6]]
188
189  """
190  @doc since: "1.5.0"
191  @spec chunk_every(Enumerable.t(), pos_integer, pos_integer, Enumerable.t() | :discard) ::
192          Enumerable.t()
193  def chunk_every(enum, count, step, leftover \\ [])
194      when is_integer(count) and count > 0 and is_integer(step) and step > 0 do
195    R.chunk_every(&chunk_while/4, enum, count, step, leftover)
196  end
197
198  @doc """
199  Chunks the `enum` by buffering elements for which `fun` returns the same value.
200
201  Elements are only emitted when `fun` returns a new value or the `enum` finishes.
202
203  ## Examples
204
205      iex> stream = Stream.chunk_by([1, 2, 2, 3, 4, 4, 6, 7, 7], &(rem(&1, 2) == 1))
206      iex> Enum.to_list(stream)
207      [[1], [2, 2], [3], [4, 4, 6], [7, 7]]
208
209  """
210  @spec chunk_by(Enumerable.t(), (element -> any)) :: Enumerable.t()
211  def chunk_by(enum, fun) when is_function(fun, 1) do
212    R.chunk_by(&chunk_while/4, enum, fun)
213  end
214
215  @doc """
216  Chunks the `enum` with fine grained control when every chunk is emitted.
217
218  `chunk_fun` receives the current element and the accumulator and
219  must return `{:cont, element, acc}` to emit the given chunk and
220  continue with accumulator or `{:cont, acc}` to not emit any chunk
221  and continue with the return accumulator.
222
223  `after_fun` is invoked when iteration is done and must also return
224  `{:cont, element, acc}` or `{:cont, acc}`.
225
226  ## Examples
227
228      iex> chunk_fun = fn element, acc ->
229      ...>   if rem(element, 2) == 0 do
230      ...>     {:cont, Enum.reverse([element | acc]), []}
231      ...>   else
232      ...>     {:cont, [element | acc]}
233      ...>   end
234      ...> end
235      iex> after_fun = fn
236      ...>   [] -> {:cont, []}
237      ...>   acc -> {:cont, Enum.reverse(acc), []}
238      ...> end
239      iex> stream = Stream.chunk_while(1..10, [], chunk_fun, after_fun)
240      iex> Enum.to_list(stream)
241      [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]]
242
243  """
244  @doc since: "1.5.0"
245  @spec chunk_while(
246          Enumerable.t(),
247          acc,
248          (element, acc -> {:cont, chunk, acc} | {:cont, acc} | {:halt, acc}),
249          (acc -> {:cont, chunk, acc} | {:cont, acc})
250        ) :: Enumerable.t()
251        when chunk: any
252  def chunk_while(enum, acc, chunk_fun, after_fun)
253      when is_function(chunk_fun, 2) and is_function(after_fun, 1) do
254    lazy(
255      enum,
256      [acc | after_fun],
257      fn f1 -> chunk_while_fun(chunk_fun, f1) end,
258      &after_chunk_while/2
259    )
260  end
261
262  defp chunk_while_fun(callback, fun) do
263    fn entry, acc(head, [acc | after_fun], tail) ->
264      case callback.(entry, acc) do
265        {:cont, emit, acc} ->
266          # If we emit an element and then we have to halt,
267          # we need to disable the after_fun callback to
268          # avoid emitting even more elements.
269          case next(fun, emit, [head | tail]) do
270            {:halt, [head | tail]} -> {:halt, acc(head, [acc | &{:cont, &1}], tail)}
271            {command, [head | tail]} -> {command, acc(head, [acc | after_fun], tail)}
272          end
273
274        {:cont, acc} ->
275          skip(acc(head, [acc | after_fun], tail))
276
277        {:halt, acc} ->
278          {:halt, acc(head, [acc | after_fun], tail)}
279      end
280    end
281  end
282
283  defp after_chunk_while(acc(h, [acc | after_fun], t), f1) do
284    case after_fun.(acc) do
285      {:cont, emit, acc} -> next_with_acc(f1, emit, h, [acc | after_fun], t)
286      {:cont, acc} -> {:cont, acc(h, [acc | after_fun], t)}
287    end
288  end
289
290  @doc """
291  Creates a stream that only emits elements if they are different from the last emitted element.
292
293  This function only ever needs to store the last emitted element.
294
295  Elements are compared using `===/2`.
296
297  ## Examples
298
299      iex> Stream.dedup([1, 2, 3, 3, 2, 1]) |> Enum.to_list()
300      [1, 2, 3, 2, 1]
301
302  """
303  @spec dedup(Enumerable.t()) :: Enumerable.t()
304  def dedup(enum) do
305    dedup_by(enum, fn x -> x end)
306  end
307
308  @doc """
309  Creates a stream that only emits elements if the result of calling `fun` on the element is
310  different from the (stored) result of calling `fun` on the last emitted element.
311
312  ## Examples
313
314      iex> Stream.dedup_by([{1, :x}, {2, :y}, {2, :z}, {1, :x}], fn {x, _} -> x end) |> Enum.to_list()
315      [{1, :x}, {2, :y}, {1, :x}]
316
317  """
318  @spec dedup_by(Enumerable.t(), (element -> term)) :: Enumerable.t()
319  def dedup_by(enum, fun) when is_function(fun, 1) do
320    lazy(enum, nil, fn f1 -> R.dedup(fun, f1) end)
321  end
322
323  @doc """
324  Lazily drops the next `n` elements from the enumerable.
325
326  If a negative `n` is given, it will drop the last `n` elements from
327  the collection. Note that the mechanism by which this is implemented
328  will delay the emission of any element until `n` additional elements have
329  been emitted by the enum.
330
331  ## Examples
332
333      iex> stream = Stream.drop(1..10, 5)
334      iex> Enum.to_list(stream)
335      [6, 7, 8, 9, 10]
336
337      iex> stream = Stream.drop(1..10, -5)
338      iex> Enum.to_list(stream)
339      [1, 2, 3, 4, 5]
340
341  """
342  @spec drop(Enumerable.t(), integer) :: Enumerable.t()
343  def drop(enum, n) when is_integer(n) and n >= 0 do
344    lazy(enum, n, fn f1 -> R.drop(f1) end)
345  end
346
347  def drop(enum, n) when is_integer(n) and n < 0 do
348    n = abs(n)
349
350    lazy(enum, {0, [], []}, fn f1 ->
351      fn
352        entry, [h, {count, buf1, []} | t] ->
353          do_drop(:cont, n, entry, h, count, buf1, [], t)
354
355        entry, [h, {count, buf1, [next | buf2]} | t] ->
356          {reason, [h | t]} = f1.(next, [h | t])
357          do_drop(reason, n, entry, h, count, buf1, buf2, t)
358      end
359    end)
360  end
361
362  defp do_drop(reason, n, entry, h, count, buf1, buf2, t) do
363    buf1 = [entry | buf1]
364    count = count + 1
365
366    if count == n do
367      {reason, [h, {0, [], :lists.reverse(buf1)} | t]}
368    else
369      {reason, [h, {count, buf1, buf2} | t]}
370    end
371  end
372
373  @doc """
374  Creates a stream that drops every `nth` element from the enumerable.
375
376  The first element is always dropped, unless `nth` is 0.
377
378  `nth` must be a non-negative integer.
379
380  ## Examples
381
382      iex> stream = Stream.drop_every(1..10, 2)
383      iex> Enum.to_list(stream)
384      [2, 4, 6, 8, 10]
385
386      iex> stream = Stream.drop_every(1..1000, 1)
387      iex> Enum.to_list(stream)
388      []
389
390      iex> stream = Stream.drop_every([1, 2, 3, 4, 5], 0)
391      iex> Enum.to_list(stream)
392      [1, 2, 3, 4, 5]
393
394  """
395  @spec drop_every(Enumerable.t(), non_neg_integer) :: Enumerable.t()
396  def drop_every(enum, nth)
397  def drop_every(enum, 0), do: %Stream{enum: enum}
398  def drop_every([], _nth), do: %Stream{enum: []}
399
400  def drop_every(enum, nth) when is_integer(nth) and nth > 0 do
401    lazy(enum, nth, fn f1 -> R.drop_every(nth, f1) end)
402  end
403
404  @doc """
405  Lazily drops elements of the enumerable while the given
406  function returns a truthy value.
407
408  ## Examples
409
410      iex> stream = Stream.drop_while(1..10, &(&1 <= 5))
411      iex> Enum.to_list(stream)
412      [6, 7, 8, 9, 10]
413
414  """
415  @spec drop_while(Enumerable.t(), (element -> as_boolean(term))) :: Enumerable.t()
416  def drop_while(enum, fun) when is_function(fun, 1) do
417    lazy(enum, true, fn f1 -> R.drop_while(fun, f1) end)
418  end
419
420  @doc """
421  Executes the given function for each element.
422
423  Useful for adding side effects (like printing) to a stream.
424
425  ## Examples
426
427      iex> stream = Stream.each([1, 2, 3], fn x -> send(self(), x) end)
428      iex> Enum.to_list(stream)
429      iex> receive do: (x when is_integer(x) -> x)
430      1
431      iex> receive do: (x when is_integer(x) -> x)
432      2
433      iex> receive do: (x when is_integer(x) -> x)
434      3
435
436  """
437  @spec each(Enumerable.t(), (element -> term)) :: Enumerable.t()
438  def each(enum, fun) when is_function(fun, 1) do
439    lazy(enum, fn f1 ->
440      fn x, acc ->
441        fun.(x)
442        f1.(x, acc)
443      end
444    end)
445  end
446
447  @doc """
448  Maps the given `fun` over `enumerable` and flattens the result.
449
450  This function returns a new stream built by appending the result of invoking `fun`
451  on each element of `enumerable` together.
452
453  ## Examples
454
455      iex> stream = Stream.flat_map([1, 2, 3], fn x -> [x, x * 2] end)
456      iex> Enum.to_list(stream)
457      [1, 2, 2, 4, 3, 6]
458
459      iex> stream = Stream.flat_map([1, 2, 3], fn x -> [[x]] end)
460      iex> Enum.to_list(stream)
461      [[1], [2], [3]]
462
463  """
464  @spec flat_map(Enumerable.t(), (element -> Enumerable.t())) :: Enumerable.t()
465  def flat_map(enum, mapper) when is_function(mapper, 1) do
466    transform(enum, nil, fn val, nil -> {mapper.(val), nil} end)
467  end
468
469  @doc """
470  Creates a stream that filters elements according to
471  the given function on enumeration.
472
473  ## Examples
474
475      iex> stream = Stream.filter([1, 2, 3], fn x -> rem(x, 2) == 0 end)
476      iex> Enum.to_list(stream)
477      [2]
478
479  """
480  @spec filter(Enumerable.t(), (element -> as_boolean(term))) :: Enumerable.t()
481  def filter(enum, fun) when is_function(fun, 1) do
482    lazy(enum, fn f1 -> R.filter(fun, f1) end)
483  end
484
485  @doc false
486  @deprecated "Use Stream.filter/2 + Stream.map/2 instead"
487  def filter_map(enum, filter, mapper) do
488    lazy(enum, fn f1 -> R.filter_map(filter, mapper, f1) end)
489  end
490
491  @doc """
492  Creates a stream that emits a value after the given period `n`
493  in milliseconds.
494
495  The values emitted are an increasing counter starting at `0`.
496  This operation will block the caller by the given interval
497  every time a new element is streamed.
498
499  Do not use this function to generate a sequence of numbers.
500  If blocking the caller process is not necessary, use
501  `Stream.iterate(0, & &1 + 1)` instead.
502
503  ## Examples
504
505      iex> Stream.interval(10) |> Enum.take(10)
506      [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
507
508  """
509  @spec interval(timer()) :: Enumerable.t()
510  def interval(n)
511      when is_integer(n) and n >= 0
512      when n == :infinity do
513    unfold(0, fn count ->
514      Process.sleep(n)
515      {count, count + 1}
516    end)
517  end
518
519  @doc """
520  Injects the stream values into the given collectable as a side-effect.
521
522  This function is often used with `run/1` since any evaluation
523  is delayed until the stream is executed. See `run/1` for an example.
524  """
525  @spec into(Enumerable.t(), Collectable.t(), (term -> term)) :: Enumerable.t()
526  def into(enum, collectable, transform \\ fn x -> x end) when is_function(transform, 1) do
527    &do_into(enum, collectable, transform, &1, &2)
528  end
529
530  defp do_into(enum, collectable, transform, acc, fun) do
531    {initial, into} = Collectable.into(collectable)
532
533    composed = fn x, [acc | collectable] ->
534      collectable = into.(collectable, {:cont, transform.(x)})
535      {reason, acc} = fun.(x, acc)
536      {reason, [acc | collectable]}
537    end
538
539    do_into(&Enumerable.reduce(enum, &1, composed), initial, into, acc)
540  end
541
542  defp do_into(reduce, collectable, into, {command, acc}) do
543    try do
544      reduce.({command, [acc | collectable]})
545    catch
546      kind, reason ->
547        into.(collectable, :halt)
548        :erlang.raise(kind, reason, __STACKTRACE__)
549    else
550      {:suspended, [acc | collectable], continuation} ->
551        {:suspended, acc, &do_into(continuation, collectable, into, &1)}
552
553      {reason, [acc | collectable]} ->
554        into.(collectable, :done)
555        {reason, acc}
556    end
557  end
558
559  @doc """
560  Creates a stream that will apply the given function on
561  enumeration.
562
563  ## Examples
564
565      iex> stream = Stream.map([1, 2, 3], fn x -> x * 2 end)
566      iex> Enum.to_list(stream)
567      [2, 4, 6]
568
569  """
570  @spec map(Enumerable.t(), (element -> any)) :: Enumerable.t()
571  def map(enum, fun) when is_function(fun, 1) do
572    lazy(enum, fn f1 -> R.map(fun, f1) end)
573  end
574
575  @doc """
576  Creates a stream that will apply the given function on
577  every `nth` element from the enumerable.
578
579  The first element is always passed to the given function.
580
581  `nth` must be a non-negative integer.
582
583  ## Examples
584
585      iex> stream = Stream.map_every(1..10, 2, fn x -> x * 2 end)
586      iex> Enum.to_list(stream)
587      [2, 2, 6, 4, 10, 6, 14, 8, 18, 10]
588
589      iex> stream = Stream.map_every([1, 2, 3, 4, 5], 1, fn x -> x * 2 end)
590      iex> Enum.to_list(stream)
591      [2, 4, 6, 8, 10]
592
593      iex> stream = Stream.map_every(1..5, 0, fn x -> x * 2 end)
594      iex> Enum.to_list(stream)
595      [1, 2, 3, 4, 5]
596
597  """
598  @doc since: "1.4.0"
599  @spec map_every(Enumerable.t(), non_neg_integer, (element -> any)) :: Enumerable.t()
600  def map_every(enum, nth, fun) when is_integer(nth) and nth >= 0 and is_function(fun, 1) do
601    map_every_after_guards(enum, nth, fun)
602  end
603
604  defp map_every_after_guards(enum, 1, fun), do: map(enum, fun)
605  defp map_every_after_guards(enum, 0, _fun), do: %Stream{enum: enum}
606  defp map_every_after_guards([], _nth, _fun), do: %Stream{enum: []}
607
608  defp map_every_after_guards(enum, nth, fun) do
609    lazy(enum, nth, fn f1 -> R.map_every(nth, fun, f1) end)
610  end
611
612  @doc """
613  Creates a stream that will reject elements according to
614  the given function on enumeration.
615
616  ## Examples
617
618      iex> stream = Stream.reject([1, 2, 3], fn x -> rem(x, 2) == 0 end)
619      iex> Enum.to_list(stream)
620      [1, 3]
621
622  """
623  @spec reject(Enumerable.t(), (element -> as_boolean(term))) :: Enumerable.t()
624  def reject(enum, fun) when is_function(fun, 1) do
625    lazy(enum, fn f1 -> R.reject(fun, f1) end)
626  end
627
628  @doc """
629  Runs the given stream.
630
631  This is useful when a stream needs to be run, for side effects,
632  and there is no interest in its return result.
633
634  ## Examples
635
636  Open up a file, replace all `#` by `%` and stream to another file
637  without loading the whole file in memory:
638
639      File.stream!("/path/to/file")
640      |> Stream.map(&String.replace(&1, "#", "%"))
641      |> Stream.into(File.stream!("/path/to/other/file"))
642      |> Stream.run()
643
644  No computation will be done until we call one of the `Enum` functions
645  or `run/1`.
646  """
647  @spec run(Enumerable.t()) :: :ok
648  def run(stream) do
649    _ = Enumerable.reduce(stream, {:cont, nil}, fn _, _ -> {:cont, nil} end)
650    :ok
651  end
652
653  @doc """
654  Creates a stream that applies the given function to each
655  element, emits the result and uses the same result as the accumulator
656  for the next computation. Uses the first element in the enumerable
657  as the starting value.
658
659  ## Examples
660
661      iex> stream = Stream.scan(1..5, &(&1 + &2))
662      iex> Enum.to_list(stream)
663      [1, 3, 6, 10, 15]
664
665  """
666  @spec scan(Enumerable.t(), (element, acc -> any)) :: Enumerable.t()
667  def scan(enum, fun) when is_function(fun, 2) do
668    lazy(enum, :first, fn f1 -> R.scan2(fun, f1) end)
669  end
670
671  @doc """
672  Creates a stream that applies the given function to each
673  element, emits the result and uses the same result as the accumulator
674  for the next computation. Uses the given `acc` as the starting value.
675
676  ## Examples
677
678      iex> stream = Stream.scan(1..5, 0, &(&1 + &2))
679      iex> Enum.to_list(stream)
680      [1, 3, 6, 10, 15]
681
682  """
683  @spec scan(Enumerable.t(), acc, (element, acc -> any)) :: Enumerable.t()
684  def scan(enum, acc, fun) when is_function(fun, 2) do
685    lazy(enum, acc, fn f1 -> R.scan3(fun, f1) end)
686  end
687
688  @doc """
689  Lazily takes the next `count` elements from the enumerable and stops
690  enumeration.
691
692  If a negative `count` is given, the last `count` values will be taken.
693  For such, the collection is fully enumerated keeping up to `2 * count`
694  elements in memory. Once the end of the collection is reached,
695  the last `count` elements will be executed. Therefore, using
696  a negative `count` on an infinite collection will never return.
697
698  ## Examples
699
700      iex> stream = Stream.take(1..100, 5)
701      iex> Enum.to_list(stream)
702      [1, 2, 3, 4, 5]
703
704      iex> stream = Stream.take(1..100, -5)
705      iex> Enum.to_list(stream)
706      [96, 97, 98, 99, 100]
707
708      iex> stream = Stream.cycle([1, 2, 3]) |> Stream.take(5)
709      iex> Enum.to_list(stream)
710      [1, 2, 3, 1, 2]
711
712  """
713  @spec take(Enumerable.t(), integer) :: Enumerable.t()
714  def take(enum, count) when is_integer(count) do
715    take_after_guards(enum, count)
716  end
717
718  defp take_after_guards(_enum, 0), do: %Stream{enum: []}
719
720  defp take_after_guards([], _count), do: %Stream{enum: []}
721
722  defp take_after_guards(enum, count) when count > 0 do
723    lazy(enum, count, fn f1 -> R.take(f1) end)
724  end
725
726  defp take_after_guards(enum, count) when count < 0 do
727    &Enumerable.reduce(Enum.take(enum, count), &1, &2)
728  end
729
730  @doc """
731  Creates a stream that takes every `nth` element from the enumerable.
732
733  The first element is always included, unless `nth` is 0.
734
735  `nth` must be a non-negative integer.
736
737  ## Examples
738
739      iex> stream = Stream.take_every(1..10, 2)
740      iex> Enum.to_list(stream)
741      [1, 3, 5, 7, 9]
742
743      iex> stream = Stream.take_every([1, 2, 3, 4, 5], 1)
744      iex> Enum.to_list(stream)
745      [1, 2, 3, 4, 5]
746
747      iex> stream = Stream.take_every(1..1000, 0)
748      iex> Enum.to_list(stream)
749      []
750
751  """
752  @spec take_every(Enumerable.t(), non_neg_integer) :: Enumerable.t()
753  def take_every(enum, nth) when is_integer(nth) and nth >= 0 do
754    take_every_after_guards(enum, nth)
755  end
756
757  defp take_every_after_guards(_enum, 0), do: %Stream{enum: []}
758
759  defp take_every_after_guards([], _nth), do: %Stream{enum: []}
760
761  defp take_every_after_guards(enum, nth) do
762    lazy(enum, nth, fn f1 -> R.take_every(nth, f1) end)
763  end
764
765  @doc """
766  Lazily takes elements of the enumerable while the given
767  function returns a truthy value.
768
769  ## Examples
770
771      iex> stream = Stream.take_while(1..100, &(&1 <= 5))
772      iex> Enum.to_list(stream)
773      [1, 2, 3, 4, 5]
774
775  """
776  @spec take_while(Enumerable.t(), (element -> as_boolean(term))) :: Enumerable.t()
777  def take_while(enum, fun) when is_function(fun, 1) do
778    lazy(enum, fn f1 -> R.take_while(fun, f1) end)
779  end
780
781  @doc """
782  Creates a stream that emits a single value after `n` milliseconds.
783
784  The value emitted is `0`. This operation will block the caller by
785  the given time until the element is streamed.
786
787  ## Examples
788
789      iex> Stream.timer(10) |> Enum.to_list()
790      [0]
791
792  """
793  @spec timer(timer()) :: Enumerable.t()
794  def timer(n)
795      when is_integer(n) and n >= 0
796      when n == :infinity do
797    take(interval(n), 1)
798  end
799
800  @doc """
801  Transforms an existing stream.
802
803  It expects an accumulator and a function that receives each stream element
804  and an accumulator. It must return a tuple, where the first element is a new
805  stream (often a list) or the atom `:halt`, and the second element is the
806  accumulator to be used by the next element, if any, in both cases.
807
808  Note: this function is equivalent to `Enum.flat_map_reduce/3`, except this
809  function does not return the accumulator once the stream is processed.
810
811  ## Examples
812
813  `Stream.transform/3` is useful as it can be used as the basis to implement
814  many of the functions defined in this module. For example, we can implement
815  `Stream.take(enum, n)` as follows:
816
817      iex> enum = 1001..9999
818      iex> n = 3
819      iex> stream = Stream.transform(enum, 0, fn i, acc ->
820      ...>   if acc < n, do: {[i], acc + 1}, else: {:halt, acc}
821      ...> end)
822      iex> Enum.to_list(stream)
823      [1001, 1002, 1003]
824
825  """
826  @spec transform(Enumerable.t(), acc, fun) :: Enumerable.t()
827        when fun: (element, acc -> {Enumerable.t(), acc} | {:halt, acc}),
828             acc: any
829  def transform(enum, acc, reducer) when is_function(reducer, 2) do
830    &do_transform(enum, fn -> acc end, reducer, &1, &2, nil)
831  end
832
833  @doc """
834  Transforms an existing stream with function-based start and finish.
835
836  The accumulator is only calculated when transformation starts. It also
837  allows an after function to be given which is invoked when the stream
838  halts or completes.
839
840  This function can be seen as a combination of `Stream.resource/3` with
841  `Stream.transform/3`.
842  """
843  @spec transform(Enumerable.t(), (() -> acc), fun, (acc -> term)) :: Enumerable.t()
844        when fun: (element, acc -> {Enumerable.t(), acc} | {:halt, acc}),
845             acc: any
846  def transform(enum, start_fun, reducer, after_fun)
847      when is_function(start_fun, 0) and is_function(reducer, 2) and is_function(after_fun, 1) do
848    &do_transform(enum, start_fun, reducer, &1, &2, after_fun)
849  end
850
851  defp do_transform(enumerables, user_acc, user, inner_acc, fun, after_fun) do
852    inner = &do_transform_each(&1, &2, fun)
853    step = &do_transform_step(&1, &2)
854    next = &Enumerable.reduce(enumerables, &1, step)
855    funs = {user, fun, inner, after_fun}
856    do_transform(user_acc.(), :cont, next, inner_acc, funs)
857  end
858
859  defp do_transform(user_acc, _next_op, next, {:halt, inner_acc}, funs) do
860    {_, _, _, after_fun} = funs
861    next.({:halt, []})
862    do_after(after_fun, user_acc)
863    {:halted, inner_acc}
864  end
865
866  defp do_transform(user_acc, next_op, next, {:suspend, inner_acc}, funs) do
867    {:suspended, inner_acc, &do_transform(user_acc, next_op, next, &1, funs)}
868  end
869
870  defp do_transform(user_acc, :halt, _next, {_, inner_acc}, funs) do
871    {_, _, _, after_fun} = funs
872    do_after(after_fun, user_acc)
873    {:halted, inner_acc}
874  end
875
876  defp do_transform(user_acc, :cont, next, inner_acc, funs) do
877    {_, _, _, after_fun} = funs
878
879    try do
880      next.({:cont, []})
881    catch
882      kind, reason ->
883        do_after(after_fun, user_acc)
884        :erlang.raise(kind, reason, __STACKTRACE__)
885    else
886      {:suspended, vals, next} ->
887        do_transform_user(:lists.reverse(vals), user_acc, :cont, next, inner_acc, funs)
888
889      {_, vals} ->
890        do_transform_user(:lists.reverse(vals), user_acc, :halt, next, inner_acc, funs)
891    end
892  end
893
894  defp do_transform_user([], user_acc, next_op, next, inner_acc, funs) do
895    do_transform(user_acc, next_op, next, inner_acc, funs)
896  end
897
898  defp do_transform_user([val | vals], user_acc, next_op, next, inner_acc, funs) do
899    {user, fun, inner, after_fun} = funs
900
901    try do
902      user.(val, user_acc)
903    catch
904      kind, reason ->
905        next.({:halt, []})
906        do_after(after_fun, user_acc)
907        :erlang.raise(kind, reason, __STACKTRACE__)
908    else
909      {[], user_acc} ->
910        do_transform_user(vals, user_acc, next_op, next, inner_acc, funs)
911
912      {list, user_acc} when is_list(list) ->
913        reduce = &Enumerable.List.reduce(list, &1, fun)
914        do_list_transform(vals, user_acc, next_op, next, inner_acc, reduce, funs)
915
916      {:halt, user_acc} ->
917        next.({:halt, []})
918        do_after(after_fun, user_acc)
919        {:halted, elem(inner_acc, 1)}
920
921      {other, user_acc} ->
922        reduce = &Enumerable.reduce(other, &1, inner)
923        do_enum_transform(vals, user_acc, next_op, next, inner_acc, reduce, funs)
924    end
925  end
926
927  defp do_list_transform(vals, user_acc, next_op, next, inner_acc, reduce, funs) do
928    {_, _, _, after_fun} = funs
929
930    try do
931      reduce.(inner_acc)
932    catch
933      kind, reason ->
934        next.({:halt, []})
935        do_after(after_fun, user_acc)
936        :erlang.raise(kind, reason, __STACKTRACE__)
937    else
938      {:done, acc} ->
939        do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs)
940
941      {:halted, acc} ->
942        next.({:halt, []})
943        do_after(after_fun, user_acc)
944        {:halted, acc}
945
946      {:suspended, acc, continuation} ->
947        resume = &do_list_transform(vals, user_acc, next_op, next, &1, continuation, funs)
948        {:suspended, acc, resume}
949    end
950  end
951
952  defp do_enum_transform(vals, user_acc, next_op, next, {op, inner_acc}, reduce, funs) do
953    {_, _, _, after_fun} = funs
954
955    try do
956      reduce.({op, [:outer | inner_acc]})
957    catch
958      kind, reason ->
959        next.({:halt, []})
960        do_after(after_fun, user_acc)
961        :erlang.raise(kind, reason, __STACKTRACE__)
962    else
963      # Only take into account outer halts when the op is not halt itself.
964      # Otherwise, we were the ones wishing to halt, so we should just stop.
965      {:halted, [:outer | acc]} when op != :halt ->
966        do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs)
967
968      {:halted, [_ | acc]} ->
969        next.({:halt, []})
970        do_after(after_fun, user_acc)
971        {:halted, acc}
972
973      {:done, [_ | acc]} ->
974        do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs)
975
976      {:suspended, [_ | acc], continuation} ->
977        resume = &do_enum_transform(vals, user_acc, next_op, next, &1, continuation, funs)
978        {:suspended, acc, resume}
979    end
980  end
981
982  defp do_after(nil, _user_acc), do: :ok
983  defp do_after(fun, user_acc), do: fun.(user_acc)
984
985  defp do_transform_each(x, [:outer | acc], f) do
986    case f.(x, acc) do
987      {:halt, res} -> {:halt, [:inner | res]}
988      {op, res} -> {op, [:outer | res]}
989    end
990  end
991
992  defp do_transform_step(x, acc) do
993    {:suspend, [x | acc]}
994  end
995
996  @doc """
997  Creates a stream that only emits elements if they are unique.
998
999  Keep in mind that, in order to know if an element is unique
1000  or not, this function needs to store all unique values emitted
1001  by the stream. Therefore, if the stream is infinite, the number
1002  of elements stored will grow infinitely, never being garbage-collected.
1003
1004  ## Examples
1005
1006      iex> Stream.uniq([1, 2, 3, 3, 2, 1]) |> Enum.to_list()
1007      [1, 2, 3]
1008
1009  """
1010  @spec uniq(Enumerable.t()) :: Enumerable.t()
1011  def uniq(enum) do
1012    uniq_by(enum, fn x -> x end)
1013  end
1014
1015  @doc false
1016  @deprecated "Use Stream.uniq_by/2 instead"
1017  def uniq(enum, fun) do
1018    uniq_by(enum, fun)
1019  end
1020
1021  @doc """
1022  Creates a stream that only emits elements if they are unique, by removing the
1023  elements for which function `fun` returned duplicate elements.
1024
1025  The function `fun` maps every element to a term which is used to
1026  determine if two elements are duplicates.
1027
1028  Keep in mind that, in order to know if an element is unique
1029  or not, this function needs to store all unique values emitted
1030  by the stream. Therefore, if the stream is infinite, the number
1031  of elements stored will grow infinitely, never being garbage-collected.
1032
1033  ## Example
1034
1035      iex> Stream.uniq_by([{1, :x}, {2, :y}, {1, :z}], fn {x, _} -> x end) |> Enum.to_list()
1036      [{1, :x}, {2, :y}]
1037
1038      iex> Stream.uniq_by([a: {:tea, 2}, b: {:tea, 2}, c: {:coffee, 1}], fn {_, y} -> y end) |> Enum.to_list()
1039      [a: {:tea, 2}, c: {:coffee, 1}]
1040
1041  """
1042  @spec uniq_by(Enumerable.t(), (element -> term)) :: Enumerable.t()
1043  def uniq_by(enum, fun) when is_function(fun, 1) do
1044    lazy(enum, %{}, fn f1 -> R.uniq_by(fun, f1) end)
1045  end
1046
1047  @doc """
1048  Creates a stream where each element in the enumerable will
1049  be wrapped in a tuple alongside its index.
1050
1051  If an `offset` is given, we will index from the given offset instead of from zero.
1052
1053  ## Examples
1054
1055      iex> stream = Stream.with_index([1, 2, 3])
1056      iex> Enum.to_list(stream)
1057      [{1, 0}, {2, 1}, {3, 2}]
1058
1059      iex> stream = Stream.with_index([1, 2, 3], 3)
1060      iex> Enum.to_list(stream)
1061      [{1, 3}, {2, 4}, {3, 5}]
1062
1063  """
1064  @spec with_index(Enumerable.t(), integer) :: Enumerable.t()
1065  def with_index(enum, offset \\ 0) when is_integer(offset) do
1066    lazy(enum, offset, fn f1 -> R.with_index(f1) end)
1067  end
1068
1069  ## Combiners
1070
1071  @doc """
1072  Creates a stream that enumerates each enumerable in an enumerable.
1073
1074  ## Examples
1075
1076      iex> stream = Stream.concat([1..3, 4..6, 7..9])
1077      iex> Enum.to_list(stream)
1078      [1, 2, 3, 4, 5, 6, 7, 8, 9]
1079
1080  """
1081  @spec concat(Enumerable.t()) :: Enumerable.t()
1082  def concat(enumerables) do
1083    flat_map(enumerables, & &1)
1084  end
1085
1086  @doc """
1087  Creates a stream that enumerates the first argument, followed by the second.
1088
1089  ## Examples
1090
1091      iex> stream = Stream.concat(1..3, 4..6)
1092      iex> Enum.to_list(stream)
1093      [1, 2, 3, 4, 5, 6]
1094
1095      iex> stream1 = Stream.cycle([1, 2, 3])
1096      iex> stream2 = Stream.cycle([4, 5, 6])
1097      iex> stream = Stream.concat(stream1, stream2)
1098      iex> Enum.take(stream, 6)
1099      [1, 2, 3, 1, 2, 3]
1100
1101  """
1102  @spec concat(Enumerable.t(), Enumerable.t()) :: Enumerable.t()
1103  def concat(first, second) do
1104    flat_map([first, second], & &1)
1105  end
1106
1107  @doc """
1108  Zips two enumerables together, lazily.
1109
1110  The zipping finishes as soon as either enumerable completes.
1111
1112  ## Examples
1113
1114      iex> concat = Stream.concat(1..3, 4..6)
1115      iex> cycle = Stream.cycle([:a, :b, :c])
1116      iex> Stream.zip(concat, cycle) |> Enum.to_list()
1117      [{1, :a}, {2, :b}, {3, :c}, {4, :a}, {5, :b}, {6, :c}]
1118
1119  """
1120  @spec zip(Enumerable.t(), Enumerable.t()) :: Enumerable.t()
1121  def zip(enumerable1, enumerable2) do
1122    zip_with(enumerable1, enumerable2, fn left, right -> {left, right} end)
1123  end
1124
1125  @doc """
1126  Zips corresponding elements from a finite collection of enumerables
1127  into one stream of tuples.
1128
1129  The zipping finishes as soon as any enumerable in the given collection completes.
1130
1131  ## Examples
1132
1133      iex> concat = Stream.concat(1..3, 4..6)
1134      iex> cycle = Stream.cycle(["foo", "bar", "baz"])
1135      iex> Stream.zip([concat, [:a, :b, :c], cycle]) |> Enum.to_list()
1136      [{1, :a, "foo"}, {2, :b, "bar"}, {3, :c, "baz"}]
1137
1138  """
1139  @doc since: "1.4.0"
1140  @spec zip(enumerables) :: Enumerable.t() when enumerables: [Enumerable.t()] | Enumerable.t()
1141  def zip(enumerables) do
1142    zip_with(enumerables, &List.to_tuple(&1))
1143  end
1144
1145  @doc """
1146  Lazily zips corresponding elements from two enumerables into a new one, transforming them with
1147  the `zip_fun` function as it goes.
1148
1149  The `zip_fun` will be called with the first element from `enumerable1` and the first
1150  element from `enumerable2`, then with the second element from each, and so on until
1151  either one of the enumerables completes.
1152
1153  ## Examples
1154
1155      iex> concat = Stream.concat(1..3, 4..6)
1156      iex> Stream.zip_with(concat, concat, fn a, b -> a + b end) |> Enum.to_list()
1157      [2, 4, 6, 8, 10, 12]
1158
1159  """
1160  @doc since: "1.12.0"
1161  @spec zip_with(Enumerable.t(), Enumerable.t(), (term, term -> term)) :: Enumerable.t()
1162  def zip_with(enumerable1, enumerable2, zip_fun)
1163      when is_list(enumerable1) and is_list(enumerable2) and is_function(zip_fun, 2) do
1164    &zip_pair(enumerable1, enumerable2, &1, &2, zip_fun)
1165  end
1166
1167  def zip_with(enumerable1, enumerable2, zip_fun) when is_function(zip_fun, 2) do
1168    zip_with([enumerable1, enumerable2], fn [left, right] -> zip_fun.(left, right) end)
1169  end
1170
1171  defp zip_pair(_list1, _list2, {:halt, acc}, _fun, _zip_fun) do
1172    {:halted, acc}
1173  end
1174
1175  defp zip_pair(list1, list2, {:suspend, acc}, fun, zip_fun) do
1176    {:suspended, acc, &zip_pair(list1, list2, &1, fun, zip_fun)}
1177  end
1178
1179  defp zip_pair([], _list2, {:cont, acc}, _fun, _zip_fun), do: {:done, acc}
1180  defp zip_pair(_list1, [], {:cont, acc}, _fun, _zip_fun), do: {:done, acc}
1181
1182  defp zip_pair([head1 | tail1], [head2 | tail2], {:cont, acc}, fun, zip_fun) do
1183    zip_pair(tail1, tail2, fun.(zip_fun.(head1, head2), acc), fun, zip_fun)
1184  end
1185
1186  @doc """
1187  Lazily zips corresponding elements from a finite collection of enumerables into a new
1188  enumerable, transforming them with the `zip_fun` function as it goes.
1189
1190  The first element from each of the enums in `enumerables` will be put into a list which is then passed to
1191  the 1-arity `zip_fun` function. Then, the second elements from each of the enums are put into a list and passed to
1192  `zip_fun`, and so on until any one of the enums in `enumerables` completes.
1193
1194  Returns a new enumerable with the results of calling `zip_fun`.
1195
1196  ## Examples
1197
1198      iex> concat = Stream.concat(1..3, 4..6)
1199      iex> Stream.zip_with([concat, concat], fn [a, b] -> a + b end) |> Enum.to_list()
1200      [2, 4, 6, 8, 10, 12]
1201
1202      iex> concat = Stream.concat(1..3, 4..6)
1203      iex> Stream.zip_with([concat, concat, 1..3], fn [a, b, c] -> a + b + c end) |> Enum.to_list()
1204      [3, 6, 9]
1205
1206  """
1207  @doc since: "1.12.0"
1208  @spec zip_with(enumerables, (Enumerable.t() -> term)) :: Enumerable.t()
1209        when enumerables: [Enumerable.t()] | Enumerable.t()
1210  def zip_with(enumerables, zip_fun) when is_function(zip_fun, 1) do
1211    if is_list(enumerables) and :lists.all(&is_list/1, enumerables) do
1212      &zip_list(enumerables, &1, &2, zip_fun)
1213    else
1214      &zip_enum(enumerables, &1, &2, zip_fun)
1215    end
1216  end
1217
1218  defp zip_list(_enumerables, {:halt, acc}, _fun, _zip_fun) do
1219    {:halted, acc}
1220  end
1221
1222  defp zip_list(enumerables, {:suspend, acc}, fun, zip_fun) do
1223    {:suspended, acc, &zip_list(enumerables, &1, fun, zip_fun)}
1224  end
1225
1226  defp zip_list(enumerables, {:cont, acc}, fun, zip_fun) do
1227    case zip_list_heads_tails(enumerables, [], []) do
1228      {heads, tails} -> zip_list(tails, fun.(zip_fun.(heads), acc), fun, zip_fun)
1229      :error -> {:done, acc}
1230    end
1231  end
1232
1233  defp zip_list_heads_tails([[head | tail] | rest], heads, tails) do
1234    zip_list_heads_tails(rest, [head | heads], [tail | tails])
1235  end
1236
1237  defp zip_list_heads_tails([[] | _rest], _heads, _tails) do
1238    :error
1239  end
1240
1241  defp zip_list_heads_tails([], heads, tails) do
1242    {:lists.reverse(heads), :lists.reverse(tails)}
1243  end
1244
1245  defp zip_enum(enumerables, acc, fun, zip_fun) do
1246    step = fn x, acc ->
1247      {:suspend, :lists.reverse([x | acc])}
1248    end
1249
1250    enum_funs =
1251      Enum.map(enumerables, fn enum ->
1252        {&Enumerable.reduce(enum, &1, step), [], :cont}
1253      end)
1254
1255    do_zip_enum(enum_funs, acc, fun, zip_fun)
1256  end
1257
1258  # This implementation of do_zip_enum/4 works for any number of streams to zip
1259  defp do_zip_enum(zips, {:halt, acc}, _fun, _zip_fun) do
1260    do_zip_close(zips)
1261    {:halted, acc}
1262  end
1263
1264  defp do_zip_enum(zips, {:suspend, acc}, fun, zip_fun) do
1265    {:suspended, acc, &do_zip_enum(zips, &1, fun, zip_fun)}
1266  end
1267
1268  defp do_zip_enum([], {:cont, acc}, _callback, _zip_fun) do
1269    {:done, acc}
1270  end
1271
1272  defp do_zip_enum(zips, {:cont, acc}, callback, zip_fun) do
1273    try do
1274      do_zip_next(zips, acc, callback, [], [], zip_fun)
1275    catch
1276      kind, reason ->
1277        do_zip_close(zips)
1278        :erlang.raise(kind, reason, __STACKTRACE__)
1279    else
1280      {:next, buffer, acc} ->
1281        do_zip_enum(buffer, acc, callback, zip_fun)
1282
1283      {:done, _acc} = other ->
1284        other
1285    end
1286  end
1287
1288  # do_zip_next/6 computes the next tuple formed by
1289  # the next element of each zipped stream.
1290  defp do_zip_next(
1291         [{_, [], :halt} | zips],
1292         acc,
1293         _callback,
1294         _yielded_elems,
1295         buffer,
1296         _zip_fun
1297       ) do
1298    do_zip_close(:lists.reverse(buffer, zips))
1299    {:done, acc}
1300  end
1301
1302  defp do_zip_next([{fun, [], :cont} | zips], acc, callback, yielded_elems, buffer, zip_fun) do
1303    case fun.({:cont, []}) do
1304      {:suspended, [elem | next_acc], fun} ->
1305        next_buffer = [{fun, next_acc, :cont} | buffer]
1306        do_zip_next(zips, acc, callback, [elem | yielded_elems], next_buffer, zip_fun)
1307
1308      {_, [elem | next_acc]} ->
1309        next_buffer = [{fun, next_acc, :halt} | buffer]
1310        do_zip_next(zips, acc, callback, [elem | yielded_elems], next_buffer, zip_fun)
1311
1312      {_, []} ->
1313        # The current zipped stream terminated, so we close all the streams
1314        # and return {:halted, acc} (which is returned as is by do_zip/3).
1315        do_zip_close(:lists.reverse(buffer, zips))
1316        {:done, acc}
1317    end
1318  end
1319
1320  defp do_zip_next(
1321         [{fun, zip_acc, zip_op} | zips],
1322         acc,
1323         callback,
1324         yielded_elems,
1325         buffer,
1326         zip_fun
1327       ) do
1328    [elem | rest] = zip_acc
1329    next_buffer = [{fun, rest, zip_op} | buffer]
1330    do_zip_next(zips, acc, callback, [elem | yielded_elems], next_buffer, zip_fun)
1331  end
1332
1333  defp do_zip_next([] = _zips, acc, callback, yielded_elems, buffer, zip_fun) do
1334    # "yielded_elems" is a reversed list of results for the current iteration of
1335    # zipping. That is to say, the nth element from each of the enums being zipped.
1336    # It needs to be reversed and passed to the zipping function so it can do it's thing.
1337    {:next, :lists.reverse(buffer), callback.(zip_fun.(:lists.reverse(yielded_elems)), acc)}
1338  end
1339
1340  defp do_zip_close(zips) do
1341    :lists.foreach(fn {fun, _, _} -> fun.({:halt, []}) end, zips)
1342  end
1343
1344  ## Sources
1345
1346  @doc """
1347  Creates a stream that cycles through the given enumerable,
1348  infinitely.
1349
1350  ## Examples
1351
1352      iex> stream = Stream.cycle([1, 2, 3])
1353      iex> Enum.take(stream, 5)
1354      [1, 2, 3, 1, 2]
1355
1356  """
1357  @spec cycle(Enumerable.t()) :: Enumerable.t()
1358  def cycle(enumerable)
1359
1360  def cycle([]) do
1361    raise ArgumentError, "cannot cycle over an empty enumerable"
1362  end
1363
1364  def cycle(enumerable) when is_list(enumerable) do
1365    unfold({enumerable, enumerable}, fn
1366      {source, [h | t]} -> {h, {source, t}}
1367      {source = [h | t], []} -> {h, {source, t}}
1368    end)
1369  end
1370
1371  def cycle(enumerable) do
1372    fn acc, fun ->
1373      step = &do_cycle_step(&1, &2)
1374      cycle = &Enumerable.reduce(enumerable, &1, step)
1375      reduce = check_cycle_first_element(cycle)
1376      do_cycle(reduce, [], cycle, acc, fun)
1377    end
1378  end
1379
1380  defp do_cycle(reduce, inner_acc, _cycle, {:halt, acc}, _fun) do
1381    reduce.({:halt, inner_acc})
1382    {:halted, acc}
1383  end
1384
1385  defp do_cycle(reduce, inner_acc, cycle, {:suspend, acc}, fun) do
1386    {:suspended, acc, &do_cycle(reduce, inner_acc, cycle, &1, fun)}
1387  end
1388
1389  defp do_cycle(reduce, inner_acc, cycle, {:cont, acc}, fun) do
1390    case reduce.({:cont, inner_acc}) do
1391      {:suspended, [element], new_reduce} ->
1392        do_cycle(new_reduce, inner_acc, cycle, fun.(element, acc), fun)
1393
1394      {_, [element]} ->
1395        do_cycle(cycle, [], cycle, fun.(element, acc), fun)
1396
1397      {_, []} ->
1398        do_cycle(cycle, [], cycle, {:cont, acc}, fun)
1399    end
1400  end
1401
1402  defp do_cycle_step(x, acc) do
1403    {:suspend, [x | acc]}
1404  end
1405
1406  defp check_cycle_first_element(reduce) do
1407    fn acc ->
1408      case reduce.(acc) do
1409        {state, []} when state in [:done, :halted] ->
1410          raise ArgumentError, "cannot cycle over an empty enumerable"
1411
1412        other ->
1413          other
1414      end
1415    end
1416  end
1417
1418  @doc """
1419  Emits a sequence of values, starting with `start_value`. Successive
1420  values are generated by calling `next_fun` on the previous value.
1421
1422  ## Examples
1423
1424      iex> Stream.iterate(0, &(&1 + 1)) |> Enum.take(5)
1425      [0, 1, 2, 3, 4]
1426
1427  """
1428  @spec iterate(element, (element -> element)) :: Enumerable.t()
1429  def iterate(start_value, next_fun) when is_function(next_fun, 1) do
1430    unfold({:ok, start_value}, fn
1431      {:ok, value} ->
1432        {value, {:next, value}}
1433
1434      {:next, value} ->
1435        next = next_fun.(value)
1436        {next, {:next, next}}
1437    end)
1438  end
1439
1440  @doc """
1441  Returns a stream generated by calling `generator_fun` repeatedly.
1442
1443  ## Examples
1444
1445      # Although not necessary, let's seed the random algorithm
1446      iex> :rand.seed(:exsss, {1, 2, 3})
1447      iex> Stream.repeatedly(&:rand.uniform/0) |> Enum.take(3)
1448      [0.5455598952593053, 0.6039309974353404, 0.6684893034823949]
1449
1450  """
1451  @spec repeatedly((() -> element)) :: Enumerable.t()
1452  def repeatedly(generator_fun) when is_function(generator_fun, 0) do
1453    &do_repeatedly(generator_fun, &1, &2)
1454  end
1455
1456  defp do_repeatedly(generator_fun, {:suspend, acc}, fun) do
1457    {:suspended, acc, &do_repeatedly(generator_fun, &1, fun)}
1458  end
1459
1460  defp do_repeatedly(_generator_fun, {:halt, acc}, _fun) do
1461    {:halted, acc}
1462  end
1463
1464  defp do_repeatedly(generator_fun, {:cont, acc}, fun) do
1465    do_repeatedly(generator_fun, fun.(generator_fun.(), acc), fun)
1466  end
1467
1468  @doc """
1469  Emits a sequence of values for the given resource.
1470
1471  Similar to `transform/3` but the initial accumulated value is
1472  computed lazily via `start_fun` and executes an `after_fun` at
1473  the end of enumeration (both in cases of success and failure).
1474
1475  Successive values are generated by calling `next_fun` with the
1476  previous accumulator (the initial value being the result returned
1477  by `start_fun`) and it must return a tuple containing a list
1478  of elements to be emitted and the next accumulator. The enumeration
1479  finishes if it returns `{:halt, acc}`.
1480
1481  As the name says, this function is useful to stream values from
1482  resources.
1483
1484  ## Examples
1485
1486      Stream.resource(
1487        fn -> File.open!("sample") end,
1488        fn file ->
1489          case IO.read(file, :line) do
1490            data when is_binary(data) -> {[data], file}
1491            _ -> {:halt, file}
1492          end
1493        end,
1494        fn file -> File.close(file) end
1495      )
1496
1497      iex> Stream.resource(
1498      ...>  fn ->
1499      ...>    {:ok, pid} = StringIO.open("string")
1500      ...>    pid
1501      ...>  end,
1502      ...>  fn pid ->
1503      ...>    case IO.getn(pid, "", 1) do
1504      ...>      :eof -> {:halt, pid}
1505      ...>      char -> {[char], pid}
1506      ...>    end
1507      ...>  end,
1508      ...>  fn pid -> StringIO.close(pid) end
1509      ...> ) |> Enum.to_list()
1510      ["s", "t", "r", "i", "n", "g"]
1511
1512  """
1513  @spec resource((() -> acc), (acc -> {[element], acc} | {:halt, acc}), (acc -> term)) ::
1514          Enumerable.t()
1515  def resource(start_fun, next_fun, after_fun)
1516      when is_function(start_fun, 0) and is_function(next_fun, 1) and is_function(after_fun, 1) do
1517    &do_resource(start_fun.(), next_fun, &1, &2, after_fun)
1518  end
1519
1520  defp do_resource(next_acc, next_fun, {:suspend, acc}, fun, after_fun) do
1521    {:suspended, acc, &do_resource(next_acc, next_fun, &1, fun, after_fun)}
1522  end
1523
1524  defp do_resource(next_acc, _next_fun, {:halt, acc}, _fun, after_fun) do
1525    after_fun.(next_acc)
1526    {:halted, acc}
1527  end
1528
1529  defp do_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun) do
1530    try do
1531      next_fun.(next_acc)
1532    catch
1533      kind, reason ->
1534        after_fun.(next_acc)
1535        :erlang.raise(kind, reason, __STACKTRACE__)
1536    else
1537      {:halt, next_acc} ->
1538        do_resource(next_acc, next_fun, {:halt, acc}, fun, after_fun)
1539
1540      {[], next_acc} ->
1541        do_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun)
1542
1543      {[v], next_acc} ->
1544        do_element_resource(next_acc, next_fun, acc, fun, after_fun, v)
1545
1546      {list, next_acc} when is_list(list) ->
1547        reduce = &Enumerable.List.reduce(list, &1, fun)
1548        do_list_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun, reduce)
1549
1550      {enum, next_acc} ->
1551        inner = &do_resource_each(&1, &2, fun)
1552        reduce = &Enumerable.reduce(enum, &1, inner)
1553        do_enum_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun, reduce)
1554    end
1555  end
1556
1557  defp do_element_resource(next_acc, next_fun, acc, fun, after_fun, v) do
1558    try do
1559      fun.(v, acc)
1560    catch
1561      kind, reason ->
1562        after_fun.(next_acc)
1563        :erlang.raise(kind, reason, __STACKTRACE__)
1564    else
1565      acc ->
1566        do_resource(next_acc, next_fun, acc, fun, after_fun)
1567    end
1568  end
1569
1570  defp do_list_resource(next_acc, next_fun, acc, fun, after_fun, reduce) do
1571    try do
1572      reduce.(acc)
1573    catch
1574      kind, reason ->
1575        after_fun.(next_acc)
1576        :erlang.raise(kind, reason, __STACKTRACE__)
1577    else
1578      {:done, acc} ->
1579        do_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun)
1580
1581      {:halted, acc} ->
1582        do_resource(next_acc, next_fun, {:halt, acc}, fun, after_fun)
1583
1584      {:suspended, acc, c} ->
1585        {:suspended, acc, &do_list_resource(next_acc, next_fun, &1, fun, after_fun, c)}
1586    end
1587  end
1588
1589  defp do_enum_resource(next_acc, next_fun, {op, acc}, fun, after_fun, reduce) do
1590    try do
1591      reduce.({op, [:outer | acc]})
1592    catch
1593      kind, reason ->
1594        after_fun.(next_acc)
1595        :erlang.raise(kind, reason, __STACKTRACE__)
1596    else
1597      {:halted, [:outer | acc]} ->
1598        do_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun)
1599
1600      {:halted, [:inner | acc]} ->
1601        do_resource(next_acc, next_fun, {:halt, acc}, fun, after_fun)
1602
1603      {:done, [_ | acc]} ->
1604        do_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun)
1605
1606      {:suspended, [_ | acc], c} ->
1607        {:suspended, acc, &do_enum_resource(next_acc, next_fun, &1, fun, after_fun, c)}
1608    end
1609  end
1610
1611  defp do_resource_each(x, [:outer | acc], f) do
1612    case f.(x, acc) do
1613      {:halt, res} -> {:halt, [:inner | res]}
1614      {op, res} -> {op, [:outer | res]}
1615    end
1616  end
1617
1618  @doc """
1619  Emits a sequence of values for the given accumulator.
1620
1621  Successive values are generated by calling `next_fun` with the previous
1622  accumulator and it must return a tuple with the current value and next
1623  accumulator. The enumeration finishes if it returns `nil`.
1624
1625  ## Examples
1626
1627      iex> Stream.unfold(5, fn
1628      ...>   0 -> nil
1629      ...>   n -> {n, n - 1}
1630      ...> end) |> Enum.to_list()
1631      [5, 4, 3, 2, 1]
1632
1633  """
1634  @spec unfold(acc, (acc -> {element, acc} | nil)) :: Enumerable.t()
1635  def unfold(next_acc, next_fun) when is_function(next_fun, 1) do
1636    &do_unfold(next_acc, next_fun, &1, &2)
1637  end
1638
1639  defp do_unfold(next_acc, next_fun, {:suspend, acc}, fun) do
1640    {:suspended, acc, &do_unfold(next_acc, next_fun, &1, fun)}
1641  end
1642
1643  defp do_unfold(_next_acc, _next_fun, {:halt, acc}, _fun) do
1644    {:halted, acc}
1645  end
1646
1647  defp do_unfold(next_acc, next_fun, {:cont, acc}, fun) do
1648    case next_fun.(next_acc) do
1649      nil -> {:done, acc}
1650      {v, next_acc} -> do_unfold(next_acc, next_fun, fun.(v, acc), fun)
1651    end
1652  end
1653
1654  @doc """
1655  Lazily intersperses `intersperse_element` between each element of the enumeration.
1656
1657  ## Examples
1658
1659      iex> Stream.intersperse([1, 2, 3], 0) |> Enum.to_list()
1660      [1, 0, 2, 0, 3]
1661
1662      iex> Stream.intersperse([1], 0) |> Enum.to_list()
1663      [1]
1664
1665      iex> Stream.intersperse([], 0) |> Enum.to_list()
1666      []
1667
1668  """
1669  @doc since: "1.6.0"
1670  @spec intersperse(Enumerable.t(), any) :: Enumerable.t()
1671  def intersperse(enumerable, intersperse_element) do
1672    Stream.transform(enumerable, false, fn
1673      element, true -> {[intersperse_element, element], true}
1674      element, false -> {[element], true}
1675    end)
1676  end
1677
1678  ## Helpers
1679
1680  @compile {:inline, lazy: 2, lazy: 3, lazy: 4}
1681
1682  defp lazy(%Stream{done: nil, funs: funs} = lazy, fun), do: %{lazy | funs: [fun | funs]}
1683  defp lazy(enum, fun), do: %Stream{enum: enum, funs: [fun]}
1684
1685  defp lazy(%Stream{done: nil, funs: funs, accs: accs} = lazy, acc, fun),
1686    do: %{lazy | funs: [fun | funs], accs: [acc | accs]}
1687
1688  defp lazy(enum, acc, fun), do: %Stream{enum: enum, funs: [fun], accs: [acc]}
1689
1690  defp lazy(%Stream{done: nil, funs: funs, accs: accs} = lazy, acc, fun, done),
1691    do: %{lazy | funs: [fun | funs], accs: [acc | accs], done: done}
1692
1693  defp lazy(enum, acc, fun, done), do: %Stream{enum: enum, funs: [fun], accs: [acc], done: done}
1694end
1695
1696defimpl Enumerable, for: Stream do
1697  @compile :inline_list_funcs
1698
1699  def count(_lazy), do: {:error, __MODULE__}
1700
1701  def member?(_lazy, _value), do: {:error, __MODULE__}
1702
1703  def slice(_lazy), do: {:error, __MODULE__}
1704
1705  def reduce(lazy, acc, fun) do
1706    do_reduce(lazy, acc, fn x, [acc] ->
1707      {reason, acc} = fun.(x, acc)
1708      {reason, [acc]}
1709    end)
1710  end
1711
1712  defp do_reduce(%Stream{enum: enum, funs: funs, accs: accs, done: done}, acc, fun) do
1713    composed = :lists.foldl(fn entry_fun, acc -> entry_fun.(acc) end, fun, funs)
1714    reduce = &Enumerable.reduce(enum, &1, composed)
1715    do_each(reduce, done && {done, fun}, :lists.reverse(accs), acc)
1716  end
1717
1718  defp do_each(reduce, done, accs, {command, acc}) do
1719    case reduce.({command, [acc | accs]}) do
1720      {:suspended, [acc | accs], continuation} ->
1721        {:suspended, acc, &do_each(continuation, done, accs, &1)}
1722
1723      {:halted, accs} ->
1724        do_done({:halted, accs}, done)
1725
1726      {:done, accs} ->
1727        do_done({:done, accs}, done)
1728    end
1729  end
1730
1731  defp do_done({reason, [acc | _]}, nil), do: {reason, acc}
1732
1733  defp do_done({reason, [acc | t]}, {done, fun}) do
1734    [h | _] = :lists.reverse(t)
1735
1736    case done.([acc, h], fun) do
1737      {:cont, [acc | _]} -> {reason, acc}
1738      {:halt, [acc | _]} -> {:halted, acc}
1739      {:suspend, [acc | _]} -> {:suspended, acc, &{:done, elem(&1, 1)}}
1740    end
1741  end
1742end
1743
1744defimpl Inspect, for: Stream do
1745  import Inspect.Algebra
1746
1747  def inspect(%{enum: enum, funs: funs}, opts) do
1748    inner = [enum: enum, funs: :lists.reverse(funs)]
1749    concat(["#Stream<", to_doc(inner, opts), ">"])
1750  end
1751end
1752