1__all__ = ["Pregel"] 2 3from typing import Optional 4 5from arango.api import ApiGroup 6from arango.exceptions import ( 7 PregelJobCreateError, 8 PregelJobDeleteError, 9 PregelJobGetError, 10) 11from arango.formatter import format_pregel_job_data 12from arango.request import Request 13from arango.response import Response 14from arango.result import Result 15from arango.typings import Json 16 17 18class Pregel(ApiGroup): 19 """Pregel API wrapper.""" 20 21 def __repr__(self) -> str: 22 return f"<Pregel in {self._conn.db_name}>" 23 24 def job(self, job_id: int) -> Result[Json]: 25 """Return the details of a Pregel job. 26 27 :param job_id: Pregel job ID. 28 :type job_id: int 29 :return: Details of the Pregel job. 30 :rtype: dict 31 :raise arango.exceptions.PregelJobGetError: If retrieval fails. 32 """ 33 request = Request(method="get", endpoint=f"/_api/control_pregel/{job_id}") 34 35 def response_handler(resp: Response) -> Json: 36 if resp.is_success: 37 return format_pregel_job_data(resp.body) 38 raise PregelJobGetError(resp, request) 39 40 return self._execute(request, response_handler) 41 42 def create_job( 43 self, 44 graph: str, 45 algorithm: str, 46 store: bool = True, 47 max_gss: Optional[int] = None, 48 thread_count: Optional[int] = None, 49 async_mode: Optional[bool] = None, 50 result_field: Optional[str] = None, 51 algorithm_params: Optional[Json] = None, 52 ) -> Result[int]: 53 """Start a new Pregel job. 54 55 :param graph: Graph name. 56 :type graph: str 57 :param algorithm: Algorithm (e.g. "pagerank"). 58 :type algorithm: str 59 :param store: If set to True, Pregel engine writes results back to the 60 database. If set to False, results can be queried via AQL. 61 :type store: bool 62 :param max_gss: Max number of global iterations for the algorithm. 63 :type max_gss: int | None 64 :param thread_count: Number of parallel threads to use per worker. 65 This does not influence the number of threads used to load or store 66 data from the database (it depends on the number of shards). 67 :type thread_count: int | None 68 :param async_mode: If set to True, algorithms which support async mode 69 run without synchronized global iterations. This might lead to 70 performance increase if there are load imbalances. 71 :type async_mode: bool | None 72 :param result_field: If specified, most algorithms will write their 73 results into this field. 74 :type result_field: str | None 75 :param algorithm_params: Additional algorithm parameters. 76 :type algorithm_params: dict | None 77 :return: Pregel job ID. 78 :rtype: int 79 :raise arango.exceptions.PregelJobCreateError: If create fails. 80 """ 81 data: Json = {"algorithm": algorithm, "graphName": graph} 82 83 if algorithm_params is None: 84 algorithm_params = {} 85 86 if store is not None: 87 algorithm_params["store"] = store 88 if max_gss is not None: 89 algorithm_params["maxGSS"] = max_gss 90 if thread_count is not None: 91 algorithm_params["parallelism"] = thread_count 92 if async_mode is not None: 93 algorithm_params["async"] = async_mode 94 if result_field is not None: 95 algorithm_params["resultField"] = result_field 96 if algorithm_params: 97 data["params"] = algorithm_params 98 99 request = Request(method="post", endpoint="/_api/control_pregel", data=data) 100 101 def response_handler(resp: Response) -> int: 102 if resp.is_success: 103 return int(resp.body) 104 raise PregelJobCreateError(resp, request) 105 106 return self._execute(request, response_handler) 107 108 def delete_job(self, job_id: int) -> Result[bool]: 109 """Delete a Pregel job. 110 111 :param job_id: Pregel job ID. 112 :type job_id: int 113 :return: True if Pregel job was deleted successfully. 114 :rtype: bool 115 :raise arango.exceptions.PregelJobDeleteError: If delete fails. 116 """ 117 request = Request(method="delete", endpoint=f"/_api/control_pregel/{job_id}") 118 119 def response_handler(resp: Response) -> bool: 120 if resp.is_success: 121 return True 122 raise PregelJobDeleteError(resp, request) 123 124 return self._execute(request, response_handler) 125