1__all__ = ["Pregel"]
2
3from typing import Optional
4
5from arango.api import ApiGroup
6from arango.exceptions import (
7    PregelJobCreateError,
8    PregelJobDeleteError,
9    PregelJobGetError,
10)
11from arango.formatter import format_pregel_job_data
12from arango.request import Request
13from arango.response import Response
14from arango.result import Result
15from arango.typings import Json
16
17
18class Pregel(ApiGroup):
19    """Pregel API wrapper."""
20
21    def __repr__(self) -> str:
22        return f"<Pregel in {self._conn.db_name}>"
23
24    def job(self, job_id: int) -> Result[Json]:
25        """Return the details of a Pregel job.
26
27        :param job_id: Pregel job ID.
28        :type job_id: int
29        :return: Details of the Pregel job.
30        :rtype: dict
31        :raise arango.exceptions.PregelJobGetError: If retrieval fails.
32        """
33        request = Request(method="get", endpoint=f"/_api/control_pregel/{job_id}")
34
35        def response_handler(resp: Response) -> Json:
36            if resp.is_success:
37                return format_pregel_job_data(resp.body)
38            raise PregelJobGetError(resp, request)
39
40        return self._execute(request, response_handler)
41
42    def create_job(
43        self,
44        graph: str,
45        algorithm: str,
46        store: bool = True,
47        max_gss: Optional[int] = None,
48        thread_count: Optional[int] = None,
49        async_mode: Optional[bool] = None,
50        result_field: Optional[str] = None,
51        algorithm_params: Optional[Json] = None,
52    ) -> Result[int]:
53        """Start a new Pregel job.
54
55        :param graph: Graph name.
56        :type graph: str
57        :param algorithm: Algorithm (e.g. "pagerank").
58        :type algorithm: str
59        :param store: If set to True, Pregel engine writes results back to the
60            database. If set to False, results can be queried via AQL.
61        :type store: bool
62        :param max_gss: Max number of global iterations for the algorithm.
63        :type max_gss: int | None
64        :param thread_count: Number of parallel threads to use per worker.
65            This does not influence the number of threads used to load or store
66            data from the database (it depends on the number of shards).
67        :type thread_count: int | None
68        :param async_mode: If set to True, algorithms which support async mode
69            run without synchronized global iterations. This might lead to
70            performance increase if there are load imbalances.
71        :type async_mode: bool | None
72        :param result_field: If specified, most algorithms will write their
73            results into this field.
74        :type result_field: str | None
75        :param algorithm_params: Additional algorithm parameters.
76        :type algorithm_params: dict | None
77        :return: Pregel job ID.
78        :rtype: int
79        :raise arango.exceptions.PregelJobCreateError: If create fails.
80        """
81        data: Json = {"algorithm": algorithm, "graphName": graph}
82
83        if algorithm_params is None:
84            algorithm_params = {}
85
86        if store is not None:
87            algorithm_params["store"] = store
88        if max_gss is not None:
89            algorithm_params["maxGSS"] = max_gss
90        if thread_count is not None:
91            algorithm_params["parallelism"] = thread_count
92        if async_mode is not None:
93            algorithm_params["async"] = async_mode
94        if result_field is not None:
95            algorithm_params["resultField"] = result_field
96        if algorithm_params:
97            data["params"] = algorithm_params
98
99        request = Request(method="post", endpoint="/_api/control_pregel", data=data)
100
101        def response_handler(resp: Response) -> int:
102            if resp.is_success:
103                return int(resp.body)
104            raise PregelJobCreateError(resp, request)
105
106        return self._execute(request, response_handler)
107
108    def delete_job(self, job_id: int) -> Result[bool]:
109        """Delete a Pregel job.
110
111        :param job_id: Pregel job ID.
112        :type job_id: int
113        :return: True if Pregel job was deleted successfully.
114        :rtype: bool
115        :raise arango.exceptions.PregelJobDeleteError: If delete fails.
116        """
117        request = Request(method="delete", endpoint=f"/_api/control_pregel/{job_id}")
118
119        def response_handler(resp: Response) -> bool:
120            if resp.is_success:
121                return True
122            raise PregelJobDeleteError(resp, request)
123
124        return self._execute(request, response_handler)
125