1from ..utils import base64, unbase64, is_str 2from .connectiontypes import Connection, PageInfo, Edge 3 4 5def connection_from_list(data, args=None, **kwargs): 6 """ 7 A simple function that accepts an array and connection arguments, and returns 8 a connection object for use in GraphQL. It uses array offsets as pagination, 9 so pagination will only work if the array is static. 10 """ 11 _len = len(data) 12 return connection_from_list_slice( 13 data, 14 args, 15 slice_start=0, 16 list_length=_len, 17 list_slice_length=_len, 18 **kwargs 19 ) 20 21 22def connection_from_promised_list(data_promise, args=None, **kwargs): 23 """ 24 A version of `connectionFromArray` that takes a promised array, and returns a 25 promised connection. 26 """ 27 return data_promise.then(lambda data: connection_from_list(data, args, **kwargs)) 28 29 30def connection_from_list_slice(list_slice, args=None, connection_type=None, 31 edge_type=None, pageinfo_type=None, 32 slice_start=0, list_length=0, list_slice_length=None): 33 """ 34 Given a slice (subset) of an array, returns a connection object for use in 35 GraphQL. 36 This function is similar to `connectionFromArray`, but is intended for use 37 cases where you know the cardinality of the connection, consider it too large 38 to materialize the entire array, and instead wish pass in a slice of the 39 total result large enough to cover the range specified in `args`. 40 """ 41 connection_type = connection_type or Connection 42 edge_type = edge_type or Edge 43 pageinfo_type = pageinfo_type or PageInfo 44 45 args = args or {} 46 47 before = args.get('before') 48 after = args.get('after') 49 first = args.get('first') 50 last = args.get('last') 51 if list_slice_length is None: 52 list_slice_length = len(list_slice) 53 slice_end = slice_start + list_slice_length 54 before_offset = get_offset_with_default(before, list_length) 55 after_offset = get_offset_with_default(after, -1) 56 57 start_offset = max( 58 slice_start - 1, 59 after_offset, 60 -1 61 ) + 1 62 end_offset = min( 63 slice_end, 64 before_offset, 65 list_length 66 ) 67 if isinstance(first, int): 68 end_offset = min( 69 end_offset, 70 start_offset + first 71 ) 72 if isinstance(last, int): 73 start_offset = max( 74 start_offset, 75 end_offset - last 76 ) 77 78 # If supplied slice is too large, trim it down before mapping over it. 79 _slice = list_slice[ 80 max(start_offset - slice_start, 0): 81 list_slice_length - (slice_end - end_offset) 82 ] 83 edges = [ 84 edge_type( 85 node=node, 86 cursor=offset_to_cursor(start_offset + i) 87 ) 88 for i, node in enumerate(_slice) 89 ] 90 91 first_edge_cursor = edges[0].cursor if edges else None 92 last_edge_cursor = edges[-1].cursor if edges else None 93 lower_bound = after_offset + 1 if after else 0 94 upper_bound = before_offset if before else list_length 95 96 return connection_type( 97 edges=edges, 98 page_info=pageinfo_type( 99 start_cursor=first_edge_cursor, 100 end_cursor=last_edge_cursor, 101 has_previous_page=isinstance(last, int) and start_offset > lower_bound, 102 has_next_page=isinstance(first, int) and end_offset < upper_bound 103 ) 104 ) 105 106 107PREFIX = 'arrayconnection:' 108 109 110def connection_from_promised_list_slice(data_promise, args=None, **kwargs): 111 return data_promise.then( 112 lambda data: connection_from_list_slice(data, args, **kwargs)) 113 114 115def offset_to_cursor(offset): 116 """ 117 Creates the cursor string from an offset. 118 """ 119 return base64(PREFIX + str(offset)) 120 121 122def cursor_to_offset(cursor): 123 """ 124 Rederives the offset from the cursor string. 125 """ 126 try: 127 return int(unbase64(cursor)[len(PREFIX):]) 128 except Exception: 129 return None 130 131 132def cursor_for_object_in_connection(data, _object): 133 """ 134 Return the cursor associated with an object in an array. 135 """ 136 if _object not in data: 137 return None 138 139 offset = data.index(_object) 140 return offset_to_cursor(offset) 141 142 143def get_offset_with_default(cursor=None, default_offset=0): 144 """ 145 Given an optional cursor and a default offset, returns the offset 146 to use; if the cursor contains a valid offset, that will be used, 147 otherwise it will be the default. 148 """ 149 if not is_str(cursor): 150 return default_offset 151 152 offset = cursor_to_offset(cursor) 153 try: 154 return int(offset) 155 except Exception: 156 return default_offset 157