1defmodule Couch.DBTest do
2  @moduledoc false
3
4  import ExUnit.Callbacks, only: [on_exit: 1]
5  import ExUnit.Assertions, only: [assert: 1, assert: 2]
6
7  def set_db_context(context) do
8    context =
9      case context do
10        %{:with_db_name => true} ->
11          Map.put(context, :db_name, random_db_name())
12
13        %{:with_db_name => db_name} when is_binary(db_name) ->
14          Map.put(context, :db_name, db_name)
15
16        %{:with_random_db => db_name} when is_binary(db_name) ->
17          context
18          |> Map.put(:db_name, random_db_name(db_name))
19          |> Map.put(:with_db, true)
20
21        %{:with_partitioned_db => true} ->
22          context
23          |> Map.put(:db_name, random_db_name())
24          |> Map.put(:query, %{partitioned: true})
25          |> Map.put(:with_db, true)
26
27        %{:with_db => true} ->
28          Map.put(context, :db_name, random_db_name())
29
30        %{:with_db => db_name} when is_binary(db_name) ->
31          Map.put(context, :db_name, db_name)
32
33        _ ->
34          context
35      end
36
37    if Map.has_key?(context, :with_db) do
38      {:ok, _} = create_db(context[:db_name], query: context[:query])
39      on_exit(fn -> delete_db(context[:db_name]) end)
40    end
41
42    context
43  end
44
45  def set_config_context(context) do
46    if is_list(context[:config]) do
47      Enum.each(context[:config], fn cfg ->
48        set_config(cfg)
49      end)
50    end
51
52    context
53  end
54
55  def set_user_context(context) do
56    case Map.get(context, :user) do
57      nil ->
58        context
59
60      user when is_list(user) ->
61        user = create_user(user)
62
63        on_exit(fn ->
64          query = %{:rev => user["_rev"]}
65          resp = Couch.delete("/_users/#{user["_id"]}", query: query)
66          assert HTTPotion.Response.success?(resp)
67        end)
68
69        context = Map.put(context, :user, user)
70        userinfo = user["name"] <> ":" <> user["password"]
71        Map.put(context, :userinfo, userinfo)
72    end
73  end
74
75  def random_db_name do
76    random_db_name("random-test-db")
77  end
78
79  def random_db_name(prefix) do
80    time = :erlang.monotonic_time()
81    umi = :erlang.unique_integer([:monotonic])
82    "#{prefix}-#{time}-#{umi}"
83  end
84
85  def set_config({section, key, value}) do
86    existing = set_config_raw(section, key, value)
87
88    on_exit(fn ->
89      Enum.each(existing, fn {node, prev_value} ->
90        if prev_value != "" do
91          url = "/_node/#{node}/_config/#{section}/#{key}"
92          headers = ["X-Couch-Persist": "false"]
93          body = :jiffy.encode(prev_value)
94          resp = Couch.put(url, headers: headers, body: body)
95          assert resp.status_code == 200
96        else
97          url = "/_node/#{node}/_config/#{section}/#{key}"
98          headers = ["X-Couch-Persist": "false"]
99          resp = Couch.delete(url, headers: headers)
100          assert resp.status_code == 200
101        end
102      end)
103    end)
104  end
105
106  def set_config_raw(section, key, value) do
107    resp = Couch.get("/_membership")
108
109    Enum.map(resp.body["all_nodes"], fn node ->
110      url = "/_node/#{node}/_config/#{section}/#{key}"
111      headers = ["X-Couch-Persist": "false"]
112      body = :jiffy.encode(value)
113      resp = Couch.put(url, headers: headers, body: body)
114      assert resp.status_code == 200
115      {node, resp.body}
116    end)
117  end
118
119  def prepare_user_doc(user) do
120    required = [:name, :password]
121
122    Enum.each(required, fn key ->
123      assert Keyword.has_key?(user, key), "User missing key: #{key}"
124    end)
125
126    id = Keyword.get(user, :id)
127    name = Keyword.get(user, :name)
128    password = Keyword.get(user, :password)
129    roles = Keyword.get(user, :roles, [])
130
131    assert is_binary(name), "User name must be a string"
132    assert is_binary(password), "User password must be a string"
133    assert is_list(roles), "Roles must be a list of strings"
134
135    Enum.each(roles, fn role ->
136      assert is_binary(role), "Roles must be a list of strings"
137    end)
138
139    %{
140      "_id" => id || "org.couchdb.user:" <> name,
141      "type" => "user",
142      "name" => name,
143      "roles" => roles,
144      "password" => password
145    }
146  end
147
148  def create_user(user) do
149    user_doc = prepare_user_doc(user)
150    resp = Couch.get("/_users/#{user_doc["_id"]}")
151
152    user_doc =
153      case resp.status_code do
154        404 ->
155          user_doc
156
157        sc when sc >= 200 and sc < 300 ->
158          Map.put(user_doc, "_rev", resp.body["_rev"])
159      end
160
161    resp = Couch.post("/_users", body: user_doc)
162    assert HTTPotion.Response.success?(resp)
163    assert resp.body["ok"]
164    Map.put(user_doc, "_rev", resp.body["rev"])
165  end
166
167  def create_db(db_name, opts \\ []) do
168    retry_until(fn ->
169      resp = Couch.put("/#{db_name}", opts)
170      assert resp.status_code in [201, 202]
171      assert resp.body == %{"ok" => true}
172      {:ok, resp}
173    end)
174  end
175
176  def delete_db(db_name) do
177    resp = Couch.delete("/#{db_name}")
178    assert resp.status_code in [200, 202, 404]
179    {:ok, resp}
180  end
181
182  def create_doc(db_name, body) do
183    resp = Couch.post("/#{db_name}", body: body)
184    assert resp.status_code in [201, 202]
185    assert resp.body["ok"]
186    {:ok, resp}
187  end
188
189  def info(db_name) do
190    resp = Couch.get("/#{db_name}")
191    assert resp.status_code == 200
192    resp.body
193  end
194
195  def save(db_name, document) do
196    resp = Couch.put("/#{db_name}/#{document["_id"]}", body: document)
197    assert resp.status_code in [201, 202]
198    assert resp.body["ok"]
199    Map.put(document, "_rev", resp.body["rev"])
200  end
201
202  def bulk_save(db_name, docs) do
203    resp =
204      Couch.post(
205        "/#{db_name}/_bulk_docs",
206        body: %{
207          docs: docs
208        }
209      )
210
211    assert resp.status_code in [201, 202]
212    resp
213  end
214
215  def query(
216        db_name,
217        map_fun,
218        reduce_fun \\ nil,
219        options \\ nil,
220        keys \\ nil,
221        language \\ "javascript"
222      ) do
223    l_map_function =
224      if language == "javascript" do
225        "#{map_fun} /* avoid race cond #{now(:ms)} */"
226      else
227        map_fun
228      end
229
230    view = %{
231      :map => l_map_function
232    }
233
234    view =
235      if reduce_fun != nil do
236        Map.put(view, :reduce, reduce_fun)
237      else
238        view
239      end
240
241    {view, request_options} =
242      if options != nil and Map.has_key?(options, :options) do
243        {Map.put(view, :options, options.options), Map.delete(options, :options)}
244      else
245        {view, options}
246      end
247
248    ddoc_name = "_design/temp_#{now(:ms)}"
249
250    ddoc = %{
251      _id: ddoc_name,
252      language: language,
253      views: %{
254        view: view
255      }
256    }
257
258    request_options =
259      if keys != nil and is_list(keys) do
260        Map.merge(request_options || %{}, %{:keys => :jiffy.encode(keys)})
261      else
262        request_options
263      end
264
265    resp =
266      Couch.put(
267        "/#{db_name}/#{ddoc_name}",
268        headers: ["Content-Type": "application/json"],
269        body: ddoc
270      )
271
272    assert resp.status_code in [201, 202]
273
274    resp = Couch.get("/#{db_name}/#{ddoc_name}/_view/view", query: request_options)
275    assert resp.status_code == 200
276
277    Couch.delete("/#{db_name}/#{ddoc_name}")
278
279    resp.body
280  end
281
282  def compact(db_name) do
283    resp = Couch.post("/#{db_name}/_compact")
284    assert resp.status_code == 202
285
286    retry_until(
287      fn -> Map.get(info(db_name), "compact_running") == false end,
288      200,
289      10_000
290    )
291
292    resp.body
293  end
294
295  def replicate(src, tgt, options \\ []) do
296    username = System.get_env("EX_USERNAME") || "adm"
297    password = System.get_env("EX_PASSWORD") || "pass"
298
299    {userinfo, options} = Keyword.pop(options, :userinfo)
300
301    userinfo =
302      if userinfo == nil do
303        "#{username}:#{password}"
304      else
305        userinfo
306      end
307
308    src = set_user(src, userinfo)
309    tgt = set_user(tgt, userinfo)
310
311    defaults = [headers: [], body: %{}, timeout: 30_000]
312    options = defaults |> Keyword.merge(options) |> Enum.into(%{})
313
314    %{body: body} = options
315    body = [source: src, target: tgt] |> Enum.into(body)
316    options = Map.put(options, :body, body)
317
318    resp = Couch.post("/_replicate", Enum.to_list(options))
319    assert HTTPotion.Response.success?(resp), "#{inspect(resp)}"
320    resp.body
321  end
322
323  defp set_user(uri, userinfo) do
324    case URI.parse(uri) do
325      %{scheme: nil} ->
326        uri
327
328      %{userinfo: nil} = uri ->
329        URI.to_string(Map.put(uri, :userinfo, userinfo))
330
331      _ ->
332        uri
333    end
334  end
335
336  def view(db_name, view_name, options \\ nil, keys \\ nil) do
337    [view_root, view_name] = String.split(view_name, "/")
338
339    resp =
340      case keys do
341        nil ->
342          Couch.get("/#{db_name}/_design/#{view_root}/_view/#{view_name}", query: options)
343
344        _ ->
345          Couch.post("/#{db_name}/_design/#{view_root}/_view/#{view_name}", query: options,
346            body: %{"keys" => keys}
347          )
348      end
349
350    assert resp.status_code in [200, 201]
351    resp
352  end
353
354  def sample_doc_foo do
355    %{
356      _id: "foo",
357      bar: "baz"
358    }
359  end
360
361  # Generate range of docs with strings as keys
362  def make_docs(id_range) do
363    for id <- id_range, str_id = Integer.to_string(id) do
364      %{"_id" => str_id, "integer" => id, "string" => str_id}
365    end
366  end
367
368  # Generate range of docs based on a template
369  def make_docs(id_range, template_doc) do
370    for id <- id_range, str_id = Integer.to_string(id) do
371      Map.merge(template_doc, %{"_id" => str_id})
372    end
373  end
374
375  # Generate range of docs with atoms as keys, which are more
376  # idiomatic, and are encoded by jiffy to binaries
377  def create_docs(id_range) do
378    for id <- id_range, str_id = Integer.to_string(id) do
379      %{_id: str_id, integer: id, string: str_id}
380    end
381  end
382
383  def request_stats(path_steps, is_test) do
384    path =
385      List.foldl(
386        path_steps,
387        "/_node/_local/_stats",
388        fn p, acc ->
389          "#{acc}/#{p}"
390        end
391      )
392
393    path =
394      if is_test do
395        path <> "?flush=true"
396      else
397        path
398      end
399
400    Couch.get(path).body
401  end
402
403  def retry_until(condition, sleep \\ 100, timeout \\ 30_000) do
404    retry_until(condition, now(:ms), sleep, timeout)
405  end
406
407  defp retry_until(condition, start, sleep, timeout) do
408    now = now(:ms)
409
410    if now > start + timeout do
411      raise "timed out after #{now - start} ms"
412    else
413      try do
414        if result = condition.() do
415          result
416        else
417          raise ExUnit.AssertionError
418        end
419      rescue
420        ExUnit.AssertionError ->
421          :timer.sleep(sleep)
422          retry_until(condition, start, sleep, timeout)
423      end
424    end
425  end
426
427  defp now(:ms) do
428    case elem(:os.type, 0) do
429      :win32 ->
430        div(:erlang.system_time(), 1_000)
431      _ ->
432        div(:erlang.system_time(), 1_000_000)
433    end
434  end
435
436  @spec rev(map(), map()) :: map()
437  def rev(doc = %{_id: id}, %{"id" => id, "rev" => rev}) do
438    Map.put(doc, :_rev, rev)
439  end
440
441  @spec rev([map()], [map()]) :: [map()]
442  def rev(docs, rows) when length(docs) == length(rows) do
443    for {doc, row} <- Enum.zip(docs, rows), do: rev(doc, row)
444  end
445
446  def pretty_inspect(resp) do
447    opts = [pretty: true, width: 20, limit: :infinity, printable_limit: :infinity]
448    inspect(resp, opts)
449  end
450
451  def run_on_modified_server(settings, fun) do
452    resp = Couch.get("/_membership")
453    assert resp.status_code == 200
454    nodes = resp.body["all_nodes"]
455
456    prev_settings =
457      Enum.map(settings, fn setting ->
458        prev_setting_node =
459          Enum.reduce(nodes, %{}, fn node, acc ->
460            resp =
461              Couch.put(
462                "/_node/#{node}/_config/#{setting.section}/#{setting.key}",
463                headers: ["X-Couch-Persist": false],
464                body: :jiffy.encode(setting.value)
465              )
466
467            assert resp.status_code == 200
468            Map.put(acc, node, resp.body)
469          end)
470
471        Map.put(setting, :nodes, Map.to_list(prev_setting_node))
472      end)
473
474    try do
475      fun.()
476    after
477      Enum.each(prev_settings, fn setting ->
478        Enum.each(setting.nodes, fn node_value ->
479          node = elem(node_value, 0)
480          value = elem(node_value, 1)
481
482          if value == ~s(""\\n) or value == "" or value == nil do
483            resp =
484              Couch.delete(
485                "/_node/#{node}/_config/#{setting.section}/#{setting.key}",
486                headers: ["X-Couch-Persist": false]
487              )
488
489            assert resp.status_code == 200
490          else
491            resp =
492              Couch.put(
493                "/_node/#{node}/_config/#{setting.section}/#{setting.key}",
494                headers: ["X-Couch-Persist": false],
495                body: :jiffy.encode(value)
496              )
497
498            assert resp.status_code == 200
499          end
500        end)
501      end)
502    end
503  end
504
505  def restart_cluster do
506    resp = Couch.get("/_membership")
507    assert resp.status_code == 200
508    nodes = resp.body["all_nodes"]
509
510    nodes_ports =
511      Enum.reduce(nodes, [], fn node, acc ->
512        port = node_to_port(node)
513        [{node, port} | acc]
514      end)
515
516    tasks =
517      Enum.map(nodes_ports, fn {node, port} ->
518        Task.async(fn -> restart_node(node, port) end)
519      end)
520
521    Task.yield_many(tasks, length(nodes) * 5000)
522  end
523
524  def restart_node(node \\ "node1@127.0.0.1") do
525    port = node_to_port(node)
526    restart_node(node, port)
527  end
528
529  defp restart_node(node, port) do
530    url = "http://127.0.0.1:#{port}/_node/#{node}/_restart"
531    resp = Couch.post(url)
532    assert HTTPotion.Response.success?(resp)
533    assert resp.body["ok"]
534    # make sure node went down. we assuming the node can't bounce quick
535    # enough to inroduce a race here
536    retry_until(fn -> !node_is_running(port) end)
537    # wait utill node is back
538    retry_until(fn -> node_is_running(port) end, 500, 30_000)
539  end
540
541  defp node_is_running(port) do
542    url = "http://127.0.0.1:#{port}/_up"
543    resp = Couch.get(url)
544
545    case HTTPotion.Response.success?(resp) do
546      true -> resp.status_code in 200..399
547      false -> false
548    end
549  end
550
551  defp node_to_port(node) do
552    url = "/_node/#{node}/_config/chttpd/port"
553    resp = Couch.get(url)
554    assert HTTPotion.Response.success?(resp)
555    resp.body
556  end
557end
558