1%% 2%% Licensed to the Apache Software Foundation (ASF) under one 3%% or more contributor license agreements. See the NOTICE file 4%% distributed with this work for additional information 5%% regarding copyright ownership. The ASF licenses this file 6%% to you under the Apache License, Version 2.0 (the 7%% "License"); you may not use this file except in compliance 8%% with the License. You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, 13%% software distributed under the License is distributed on an 14%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15%% KIND, either express or implied. See the License for the 16%% specific language governing permissions and limitations 17%% under the License. 18%% 19 20-module(thrift_server). 21 22-behaviour(gen_server). 23 24%% API 25-export([start_link/3, stop/1, take_socket/2]). 26 27%% gen_server callbacks 28-export([init/1, handle_call/3, handle_cast/2, handle_info/2, 29 terminate/2, code_change/3]). 30 31-define(SERVER, ?MODULE). 32 33-record(state, {listen_socket, acceptor_ref, service, handler}). 34 35%%==================================================================== 36%% API 37%%==================================================================== 38%%-------------------------------------------------------------------- 39%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} 40%% Description: Starts the server 41%%-------------------------------------------------------------------- 42start_link(Port, Service, HandlerModule) when is_integer(Port), is_atom(HandlerModule) -> 43 gen_server:start_link({local, ?SERVER}, ?MODULE, {Port, Service, HandlerModule}, []). 44 45%%-------------------------------------------------------------------- 46%% Function: stop(Pid) -> ok, {error, Reason} 47%% Description: Stops the server. 48%%-------------------------------------------------------------------- 49stop(Pid) when is_pid(Pid) -> 50 gen_server:call(Pid, stop). 51 52 53take_socket(Server, Socket) -> 54 gen_server:call(Server, {take_socket, Socket}). 55 56 57%%==================================================================== 58%% gen_server callbacks 59%%==================================================================== 60 61%%-------------------------------------------------------------------- 62%% Function: init(Args) -> {ok, State} | 63%% {ok, State, Timeout} | 64%% ignore | 65%% {stop, Reason} 66%% Description: Initiates the server 67%%-------------------------------------------------------------------- 68init({Port, Service, Handler}) -> 69 {ok, Socket} = gen_tcp:listen(Port, 70 [binary, 71 {packet, 0}, 72 {active, false}, 73 {nodelay, true}, 74 {reuseaddr, true}]), 75 {ok, Ref} = prim_inet:async_accept(Socket, -1), 76 {ok, #state{listen_socket = Socket, 77 acceptor_ref = Ref, 78 service = Service, 79 handler = Handler}}. 80 81%%-------------------------------------------------------------------- 82%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | 83%% {reply, Reply, State, Timeout} | 84%% {noreply, State} | 85%% {noreply, State, Timeout} | 86%% {stop, Reason, Reply, State} | 87%% {stop, Reason, State} 88%% Description: Handling call messages 89%%-------------------------------------------------------------------- 90handle_call(stop, _From, State) -> 91 {stop, stopped, ok, State}; 92 93handle_call({take_socket, Socket}, {FromPid, _Tag}, State) -> 94 Result = gen_tcp:controlling_process(Socket, FromPid), 95 {reply, Result, State}. 96 97%%-------------------------------------------------------------------- 98%% Function: handle_cast(Msg, State) -> {noreply, State} | 99%% {noreply, State, Timeout} | 100%% {stop, Reason, State} 101%% Description: Handling cast messages 102%%-------------------------------------------------------------------- 103handle_cast(_Msg, State) -> 104 {noreply, State}. 105 106%%-------------------------------------------------------------------- 107%% Function: handle_info(Info, State) -> {noreply, State} | 108%% {noreply, State, Timeout} | 109%% {stop, Reason, State} 110%% Description: Handling all non call/cast messages 111%%-------------------------------------------------------------------- 112handle_info({inet_async, ListenSocket, Ref, {ok, ClientSocket}}, 113 State = #state{listen_socket = ListenSocket, 114 acceptor_ref = Ref, 115 service = Service, 116 handler = Handler}) -> 117 case set_sockopt(ListenSocket, ClientSocket) of 118 ok -> 119 %% New client connected - start processor 120 start_processor(ClientSocket, Service, Handler), 121 {ok, NewRef} = prim_inet:async_accept(ListenSocket, -1), 122 {noreply, State#state{acceptor_ref = NewRef}}; 123 {error, Reason} -> 124 error_logger:error_msg("Couldn't set socket opts: ~p~n", 125 [Reason]), 126 {stop, Reason, State} 127 end; 128 129handle_info({inet_async, _ListenSocket, _Ref, Error}, State) -> 130 error_logger:error_msg("Error in acceptor: ~p~n", [Error]), 131 {stop, Error, State}; 132 133handle_info(_Info, State) -> 134 {noreply, State}. 135 136%%-------------------------------------------------------------------- 137%% Function: terminate(Reason, State) -> void() 138%% Description: This function is called by a gen_server when it is about to 139%% terminate. It should be the opposite of Module:init/1 and do any necessary 140%% cleaning up. When it returns, the gen_server terminates with Reason. 141%% The return value is ignored. 142%%-------------------------------------------------------------------- 143terminate(_Reason, _State) -> 144 ok. 145 146%%-------------------------------------------------------------------- 147%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} 148%% Description: Convert process state when code is changed 149%%-------------------------------------------------------------------- 150code_change(_OldVsn, State, _Extra) -> 151 {ok, State}. 152 153%%-------------------------------------------------------------------- 154%%% Internal functions 155%%-------------------------------------------------------------------- 156set_sockopt(ListenSocket, ClientSocket) -> 157 true = inet_db:register_socket(ClientSocket, inet_tcp), 158 case prim_inet:getopts(ListenSocket, 159 [active, nodelay, keepalive, delay_send, priority, tos]) of 160 {ok, Opts} -> 161 case prim_inet:setopts(ClientSocket, Opts) of 162 ok -> ok; 163 Error -> gen_tcp:close(ClientSocket), 164 Error 165 end; 166 Error -> 167 gen_tcp:close(ClientSocket), 168 Error 169 end. 170 171start_processor(Socket, Service, Handler) -> 172 Server = self(), 173 174 ProtoGen = fun() -> 175 % Become the controlling process 176 ok = take_socket(Server, Socket), 177 {ok, SocketTransport} = thrift_socket_transport:new(Socket), 178 {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport), 179 {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport), 180 {ok, Protocol} 181 end, 182 183 spawn(thrift_processor, init, [{Server, ProtoGen, Service, Handler}]). 184