`
cloudtech
  • 浏览: 4595424 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
文章分类
社区版块
存档分类
最新评论

使用OTP原理构建一个非阻塞的TCP服务器

 
阅读更多

原文地址:http://www.iucai.com/?paged=8

Erlang OTP设计原理已经被shiningray兄翻译透了。请参见。http://erlang.shiningray.cn/otp-design-principles/index.html

这里翻译了一篇余锋老大和lzy.je老大推荐的文章,闲话不说,奉上。

使用OTP原理构建一个非阻塞的TCP服务器

原文网址:(打不开的同学请自觉)

http://www.trapexit.org.nyud.net:8080/Building_a_Non-blocking_TCP_server_using_OTP_principles

Author:

Serge Aleynikov <saleyn at gmail.com>

Tranlator:

David <GodwitNow@Gmail.com>

Overview:

该文的读者需要熟悉gen_server和gen_fsm行为,使用gen_tcp模块进行TCP socket通信,主动和被动的socket模式,以及OTP监视原理。

OTP为构建可靠的应用提供了一个方便的框架。该框架的实现,部分是通过抽象共同的功能到一个可服用的行为集合里面,例如链接到OTP监视层的gen_server和gen_fsm。

有几个比较著名的TCP服务器设计。这里我们要讨论的包括,一个进程监听一个客户端连接并且为每个连接的客户端产生一个FSM进程。如今在OTP里面可以通过gen_tcp模块来支持TCP链接,但是,没有标准的使用OTP准则的行为来构建非阻塞的TCP服务器。关于非阻塞(non-blocking),我们是指监听进程和处理客户端的FSM(有限状态机)不能被阻塞,时刻准备对到来的控制消息(例如更改系统配置,重启请求,等等)做出反应而不会引起超时。注意在Erlang中阻塞(blocking)意思是阻塞一个Erlang进程而不是虚拟机的操作系统进程。

本教程我们会展示如何使用gen_server行为来构建一个非阻塞的TCP服务器,用gen_fsm行为来提供流控制,并且完全符合OTP应用的设计理念。

对于OTP框架不熟悉的读者,请阅读Joe Armstrong的教程:how to buildA Fault-tolerant Serverusing blockinggen_tcp:connect/3andgen_tcp:accept/1calls without involving OTP。

本教程灵感来自于Erlang问题列表的几个主题,这个几个主题涉及到构建一个非阻塞异步TCP服务器的方法。

Server Design

我们服务器的设计会包含:主程序的监视进程tcp_server_app,该监视进程带有one_for_one重启策略,以及两个子规范。第一个是用gen_server行为实现的监听进程,该进程会等待来在客户端socket连接的异步通知。第二个是另一个监视进程tcp_client_sup,负责启动处理客户端的FSM(有限状态机)以及通过标准的SASL错误报告来记录异常连接。

为了简化该教程,处理客户端的有限状态机(tcp_echo_fsm)会实现一个echo server,该server会响应客户端的请求给客户端。

 +----------------+
 | tcp_server_app |
 +--------+-------+
 | (one_for_one)
 +----------------+---------+
 | |
 +-------+------+ +-------+--------+
 | tcp_listener | + tcp_client_sup |
 +--------------+ +-------+--------+
 | (simple_one_for_one)
 +-----|---------+
 +-------|--------+|
 +--------+-------+|+
 | tcp_echo_fsm |+
 +----------------+

Application and Supervisor behaviours

为了建立一个OTP应用程序,我们需要构建实现应用的模块,以及监视程序的回调函数。传统的这些功能都是在不同的模块里面实现的,考虑到简洁性,我们将它们组合在一个模块里面。

作为一个额外的功能,我们实现一个get_app_env函数,该函数描述了如何处理配置选项以及虚拟机启动时的命令行选项。

两个init/1函数实例用于检视层次的两层。因为针对每个监视树,我们需要两个不同的重启策略,因此,我们在不同的层里实现它们。

在应用程序启动的时候,回调函数tcp_server_app:start/2调用supervisor:start_link/2,而该函数会通过调用tcp_server_app:init( [Port, Module] )回调函数来创建应用程序的监视进程。这个监视进程创建一个tcp_listener进程以及一个子监视进程tcp_client_sup,该进程用于创建客户端链接进程。Init函数里面的模块参数正是处理客户端连接的有限状态机的名字(本例中为tcp_echo_fsm)。

TCP Server Application (tcp_server_app.erl)

-module(tcp_server_app).

-author(‘saleyn@gmail.com’).

-behaviour(application).

%% Internal API

-export([start_client/0]).

%% Application and Supervisor callbacks

-export([start/2, stop/1, init/1]).

-define(MAX_RESTART,5).

-define(MAX_TIME,60).

-define(DEF_PORT,2222).

%% A startup function for spawning new client connection handling FSM.

%% To be called by the TCP listener process.

start_client() ->

supervisor:start_child(tcp_client_sup, []).

%%———————————————————————-

%% Application behaviour callbacks

%%———————————————————————-

start(_Type, _Args) ->

ListenPort = get_app_env(listen_port,?DEF_PORT),

supervisor:start_link({local,?MODULE},?MODULE, [ListenPort, tcp_echo_fsm]).

stop(_S) ->

ok.

%%———————————————————————-

%% Supervisor behaviour callbacks

%%———————————————————————-

init([Port, Module]) ->

{ok,

{_SupFlags = {one_for_one,?MAX_RESTART,?MAX_TIME},

[

% TCP Listener

{tcp_server_sup,% Id= internal id

{tcp_listener,start_link,[Port,Module]}, % StartFun = {M, F, A}

permanent,% Restart= permanent | transient | temporary

2000,% Shutdown = brutal_kill | int() >= 0 | infinity

worker,% Type= worker | supervisor

[tcp_listener]% Modules= [Module] | dynamic

},

% Client instance supervisor

{tcp_client_sup,

{supervisor,start_link,[{local, tcp_client_sup},?MODULE, [Module]]},

permanent,% Restart= permanent | transient | temporary

infinity,% Shutdown = brutal_kill | int() >= 0 | infinity

supervisor,% Type= worker | supervisor

[]% Modules= [Module] | dynamic

}

]

}

};

init([Module]) ->

{ok,

{_SupFlags = {simple_one_for_one,?MAX_RESTART,?MAX_TIME},

[

% TCP Client

{undefined,% Id= internal id

{Module,start_link,[]},% StartFun = {M, F, A}

temporary,% Restart= permanent | transient | temporary

2000,% Shutdown = brutal_kill | int() >= 0 | infinity

worker,% Type= worker | supervisor

[]% Modules= [Module] | dynamic

}

]

}

}.

%%———————————————————————-

%% Internal functions

%%———————————————————————-

get_app_env(Opt, Default) ->

case application:get_env(application:get_application(), Opt) of

{ok, Val} -> Val;

_ ->

case init:get_argument(Opt) of

[[Val | _]] -> Val;

error-> Default

end

end.

Listener Process

Gen_tcp模块的一个缺点就是它只向一个阻塞的accept调用提供接口。这导致大部分的开发者在实现一个TCP Server的时候,构建一个自定义进程,连接到一个使用proc_lib的监视器或者提供一些专有的设计。

检查prim_inet模块揭露了一个有趣的事实,实际的调用inet驱动来accept一个客户端socket是异步的。然而,这是一个未写入文档的性能,这意味着OTP小组可以自由更改这些实现。我们会在构建我们的服务器的时候来利用这个功能。

监听进程是用gen_server行为来实现的。

TCP Listener Process (tcp_listener.erl)

-module(tcp_listener).

-author(‘saleyn@gmail.com’).

-behaviour(gen_server).

%% External API

-export([start_link/2]).

%% gen_server callbacks

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,

code_change/3]).

-record(state, {

listener,% Listening socket

acceptor,% Asynchronous acceptor’s internal reference

module% FSM handling module

}).

%%——————————————————————–

%% @spec (Port::integer(), Module) -> {ok, Pid} | {error, Reason}

%

%% @doc Called by a supervisor to start the listening process.

%% @end

%%———————————————————————-

start_link(Port, Module) when is_integer(Port), is_atom(Module) ->

gen_server:start_link({local,?MODULE},?MODULE, [Port, Module], []).

%%%————————————————————————

%%% Callback functions from gen_server

%%%————————————————————————

%%———————————————————————-

%% @spec (Port::integer()) -> {ok, State}|

%%{ok, State, Timeout}|

%%ignore|

%%{stop, Reason}

%%

%% @doc Called by gen_server framework at process startup.

%%Create listening socket.

%% @end

%%———————————————————————-

init([Port, Module]) ->

process_flag(trap_exit, true),

Opts = [binary, {packet, 2}, {reuseaddr, true},

{keepalive, true}, {backlog, 30}, {active, false}],

case gen_tcp:listen(Port, Opts) of

{ok, Listen_socket} ->

%%Create first accepting process

{ok, Ref} = prim_inet:async_accept(Listen_socket, -1),

{ok, #state{listener = Listen_socket,

acceptor = Ref,

module= Module}};

{error, Reason} ->

{stop, Reason}

end.

%%————————————————————————-

%% @spec (Request, From, State) -> {reply, Reply, State}|

%%{reply, Reply, State, Timeout} |

%%{noreply, State}|

%%{noreply, State, Timeout}|

%%{stop, Reason, Reply, State}|

%%{stop, Reason, State}

%% @doc Callback for synchronous server calls.If `{stop, …}’ tuple

%%is returned, the server is stopped and `terminate/2′ is called.

%% @end

%% @private

%%————————————————————————-

handle_call(Request, _From, State) ->

{stop, {unknown_call, Request}, State}.

%%————————————————————————-

%% @spec (Msg, State) ->{noreply, State}|

%%{noreply, State, Timeout} |

%%{stop, Reason, State}

%% @doc Callback for asyncrous server calls.If `{stop, …}’ tuple

%%is returned, the server is stopped and `terminate/2′ is called.

%% @end

%% @private

%%————————————————————————-

handle_cast(_Msg, State) ->

{noreply, State}.

%%————————————————————————-

%% @spec (Msg, State) ->{noreply, State}|

%%{noreply, State, Timeout} |

%%{stop, Reason, State}

%% @doc Callback for messages sent directly to server’s mailbox.

%%If `{stop, …}’ tuple is returned, the server is stopped and

%%`terminate/2′ is called.

%% @end

%% @private

%%————————————————————————-

handle_info({inet_async, ListSock, Ref, {ok, CliSocket}},

#state{listener=ListSock, acceptor=Ref, module=Module} = State) ->

try

case set_sockopt(ListSock, CliSocket) of

ok-> ok;

{error, Reason} -> exit({set_sockopt, Reason})

end,

%% New client connected – spawn a new process using the simple_one_for_one

%% supervisor.

{ok, Pid} = tcp_server_app:start_client(),

gen_tcp:controlling_process(CliSocket, Pid),

%% Instruct the new FSM that it owns the socket.

Module:set_socket(Pid, CliSocket),

%% Signal the network driver that we are ready to accept another connection

case prim_inet:async_accept(ListSock, -1) of

{ok,NewRef} -> ok;

{error, NewRef} -> exit({async_accept, inet:format_error(NewRef)})

end,

{noreply, State#state{acceptor=NewRef}}

catch exit:Why ->

error_logger:error_msg(“Error in async accept: ~p.\n”, [Why]),

{stop, Why, State}

end;

handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) ->

error_logger:error_msg(“Error in socket acceptor: ~p.\n”, [Error]),

{stop, Error, State};

handle_info(_Info, State) ->

{noreply, State}.

%%————————————————————————-

%% @spec (Reason, State) -> any

%% @docCallback executed on server shutdown. It is only invoked if

%%`process_flag(trap_exit, true)’ is set by the server process.

%%The return value is ignored.

%% @end

%% @private

%%————————————————————————-

terminate(_Reason, State) ->

gen_tcp:close(State#state.listener),

ok.

%%————————————————————————-

%% @spec (OldVsn, State, Extra) -> {ok, NewState}

%% @docConvert process state when code is changed.

%% @end

%% @private

%%————————————————————————-

code_change(_OldVsn, State, _Extra) ->

{ok, State}.

%%%————————————————————————

%%% Internal functions

%%%————————————————————————

%% Taken from prim_inet.We are merely copying some socket options from the

%% listening socket to the new client socket.

set_sockopt(ListSock, CliSocket) ->

true = inet_db:register_socket(CliSocket, inet_tcp),

case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of

{ok, Opts} ->

case prim_inet:setopts(CliSocket, Opts) of

ok-> ok;

Error -> gen_tcp:close(CliSocket), Error

end;

Error ->

gen_tcp:close(CliSocket), Error

end.

在该模块里面,init/1函数调用两个参数-TCP Listener启动的时候所需要的端口号和处理客户端连接的协议模块名称。初始化函数打开了使用被动模式({active, false})打开了一个监听socket。这样做,我们就可以对从已连接socket上接收到的数据进行流控制,这个已连接的socket会从监听的socket继承这个模式选项。

这段代码最有趣的部分是prim_inet:async_accept/2调用以及异步消息进行处理的inet_async。为了让其工作我们还需要复制一些OTP内部代码,这些代码封装在set_sockopt/2函数中,用来处理用inet数据库来注册socket,以及复制些设置选项到客户端socket。

一旦一个客户端socket连接上,inet driver会使用{inet_async, ListSock, Ref, {ok, CliSocket}}消息来通知listening进程。此时,我们会实例化一个客户端socket处理进程,并且设置它的CliSocket的所有者。

Client Socket Handling Process

Tcp listener是一个通用的实现,而tcp_echo_fsm则仅仅是一个简单的有限状态机,用来描述如何写TCP服务器。这个模块需要输出两个函数–一个用于tcp_client_sup监视器的start_link/0以及一个用于监听进程的set_socket/2,该函数用来通知处理客户端连接的有限状态机进程,此刻它是socket的所有者,并且通过设置{active, once}或{active, true}选项能够开始接收消息。

我们想强调在listening进程和处理客户端连接的有限状态机之间使用的异步模式,来避免可能的消息丢失,因为将来自socket的消息派发给错误的listening进程。拥有listening socket的进程通过{active, false}来打开socket。在接收完客户端的socket之后,该socket从listener继承它的socket选项(包括{active, false}),通过调用gen_tcp:controlling_process/2将socket的所有权移交给新产生的处理客户端连接的有限状态机,以及通过调用Module:set_socket/2来通知有限状态机可以从socket接收消息了。被TCP发送的数据会一直停留在socket的buffer里,直到有限状态机进程通过调用inet:setopts(Socket, [{active, once}])设置socket为主动模式来启动消息传递。

当socket的所有权被移交给处于'WAIT_FOR_SOCKET'状态的FSM后,FSM设置{active, once}选项来让inet driver一次发送给它一个TCP消息。这就是OTP方式来保持流控制,避免进程消息队列被TCP数据淹没,以及防止一个快生产者慢消费者情况引起系统崩溃。

FSM的状态被在tcp_echo_fsm模块里面的特殊函数来实现,该模块对被包在单引号里面大些状态名进行转换。FSM有两个状态组成,'WAIT_FOR_SOCKET'是初始状态,此时FSM正在等待被分配socket的所有权;'WAIT_FOR_DATA'是代表等待来自客户端TCP消息的状态。在这个状态,FSM还会处理一个特殊的超时消息,用来表示没有来自客户端的活动,引起该进程停止,并且关闭客户端连接。

TCP Client Socket Handling FSM (tcp_echo_fsm.erl)

-module(tcp_echo_fsm).

-author(‘saleyn@gmail.com’).

-behaviour(gen_fsm).

-export([start_link/0, set_socket/2]).

%% gen_fsm callbacks

-export([init/1, handle_event/3,

handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).

%% FSM States

-export([

'WAIT_FOR_SOCKET'/2,

'WAIT_FOR_DATA'/2

]).

-record(state, {

socket,% client socket

addr% client address

}).

-define(TIMEOUT, 120000).

%%%————————————————————————

%%% API

%%%————————————————————————

%%————————————————————————-

%% @spec (Socket) -> {ok,Pid} | ignore | {error,Error}

%% @doc To be called by the supervisor in order to start the server.

%%If init/1 fails with Reason, the function returns {error,Reason}.

%%If init/1 returns {stop,Reason} or ignore, the process is

%%terminated and the function returns {error,Reason} or ignore,

%%respectively.

%% @end

%%————————————————————————-

start_link() ->

gen_fsm:start_link(?MODULE, [], []).

set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) ->

gen_fsm:send_event(Pid, {socket_ready, Socket}).

%%%————————————————————————

%%% Callback functions from gen_server

%%%————————————————————————

%%————————————————————————-

%% Func: init/1

%% Returns: {ok, StateName, StateData}|

%%{ok, StateName, StateData, Timeout} |

%%ignore|

%%{stop, StopReason}

%% @private

%%————————————————————————-

init([]) ->

process_flag(trap_exit, true),

{ok, ‘WAIT_FOR_SOCKET’, #state{}}.

%%————————————————————————-

%% Func: StateName/2

%% Returns: {next_state, NextStateName, NextStateData}|

%%{next_state, NextStateName, NextStateData, Timeout} |

%%{stop, Reason, NewStateData}

%% @private

%%————————————————————————-

‘WAIT_FOR_SOCKET’({socket_ready, Socket}, State) when is_port(Socket) ->

% Now we own the socket

inet:setopts(Socket, [{active, once}, {packet, 2}, binary]),

{ok, {IP, _Port}} = inet:peername(Socket),

{next_state, ‘WAIT_FOR_DATA’, State#state{socket=Socket, addr=IP},?TIMEOUT};

‘WAIT_FOR_SOCKET’(Other, State) ->

error_logger:error_msg(“State: ‘WAIT_FOR_SOCKET’. Unexpected message: ~p\n”, [Other]),

%% Allow to receive async messages

{next_state, ‘WAIT_FOR_SOCKET’, State}.

%% Notification event coming from client

‘WAIT_FOR_DATA’({data, Data}, #state{socket=S} = State) ->

ok = gen_tcp:send(S, Data),

{next_state, ‘WAIT_FOR_DATA’, State,?TIMEOUT};

‘WAIT_FOR_DATA’(timeout, State) ->

error_logger:error_msg(“~p Client connection timeout – closing.\n”, [self()]),

{stop, normal, State};

‘WAIT_FOR_DATA’(Data, State) ->

io:format(“~p Ignoring data: ~p\n”, [self(), Data]),

{next_state, ‘WAIT_FOR_DATA’, State,?TIMEOUT}.

%%————————————————————————-

%% Func: handle_event/3

%% Returns: {next_state, NextStateName, NextStateData}|

%%{next_state, NextStateName, NextStateData, Timeout} |

%%{stop, Reason, NewStateData}

%% @private

%%————————————————————————-

handle_event(Event, StateName, StateData) ->

{stop, {StateName, undefined_event, Event}, StateData}.

%%————————————————————————-

%% Func: handle_sync_event/4

%% Returns: {next_state, NextStateName, NextStateData}|

%%{next_state, NextStateName, NextStateData, Timeout}|

%%{reply, Reply, NextStateName, NextStateData}|

%%{reply, Reply, NextStateName, NextStateData, Timeout} |

%%{stop, Reason, NewStateData}|

%%{stop, Reason, Reply, NewStateData}

%% @private

%%————————————————————————-

handle_sync_event(Event, _From, StateName, StateData) ->

{stop, {StateName, undefined_event, Event}, StateData}.

%%————————————————————————-

%% Func: handle_info/3

%% Returns: {next_state, NextStateName, NextStateData}|

%%{next_state, NextStateName, NextStateData, Timeout} |

%%{stop, Reason, NewStateData}

%% @private

%%————————————————————————-

handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) ->

% Flow control: enable forwarding of next TCP message

inet:setopts(Socket, [{active, once}]),

?MODULE:StateName({data, Bin}, StateData);

handle_info({tcp_closed, Socket}, _StateName,

#state{socket=Socket, addr=Addr} = StateData) ->

error_logger:info_msg(“~p Client ~p disconnected.\n”, [self(), Addr]),

{stop, normal, StateData};

handle_info(_Info, StateName, StateData) ->

{noreply, StateName, StateData}.

%%————————————————————————-

%% Func: terminate/3

%% Purpose: Shutdown the fsm

%% Returns: any

%% @private

%%————————————————————————-

terminate(_Reason, _StateName, #state{socket=Socket}) ->

(catch gen_tcp:close(Socket)),

ok.

%%————————————————————————-

%% Func: code_change/4

%% Purpose: Convert process state when code is changed

%% Returns: {ok, NewState, NewStateData}

%% @private

%%————————————————————————-

code_change(_OldVsn, StateName, StateData, _Extra) ->

{ok, StateName, StateData}.

Application File

构建一个OTP应用的另一个必须的部分是创建一个包含应用程序名字,版本号,启动模块和环境的应用程序文件。

Application File (tcp_server.app)

{application, tcp_server,

[

{description, "Demo TCP server"},

{vsn, "1.0"},

{id, "tcp_server"},

{modules,[tcp_listener, tcp_echo_fsm]},

{registered,[tcp_server_sup, tcp_listener]},

{applications, [kernel, stdlib]},

%%

%% mod: Specify the module name to start the application, plus args

%%

{mod, {tcp_server_app, []}},

{env, []}

]

}.

Compiling

为该应用程序创建如下的目录结构

Create the following directory structure for this application:

./tcp_server
 ./tcp_server/ebin/
 ./tcp_server/ebin/tcp_server.app
 ./tcp_server/src/tcp_server_app.erl
 ./tcp_server/src/tcp_listener.erl
 ./tcp_server/src/tcp_echo_fsm.erl
 $ cd tcp_server/src
 $ for f in tcp*.erl; do erlc +debug_info -o ../ebin $f

Running

我们会带有SASL支持来启动Erlang Shell。这样我们就能查看所有的进程和我们TCP应用的错误报告。同样,我们还会启动appmon应用,来可视化的检查监视层级。

$ cd ../ebin
 $ erl -boot start_sasl
 ...
 1> appmon:start().
 {ok,<0.44.0>}
 2> application:start(tcp_server).
 ok

现在点击在appmon窗口里面的tcp_server的按钮,以显示该tcp_server应用程序的监视层级。

3> {ok,S} = gen_tcp:connect({127,0,0,1},2222,[{packet,2}]).
 {ok,#Port<0.150>}

上一步初始化一个新的客户端连接到echo server。

4> gen_tcp:send(S,<<"hello">>).
 ok
 5> f(M), receive M -> M end.
 {tcp,#Port<0.150>,"hello"}

我们已证实echo server如我们所期望的工作。现在让我们尝试crash server上的客户端连接,然后发现监视器程序会在屏幕上产生一个错误报告。

6> [{_,Pid,_,_}] = supervisor:which_children(tcp_client_sup).
 [{undefined,<0.64.0>,worker,[]}]
 7> exit(Pid,kill).
 true
 =SUPERVISOR REPORT==== 31-Jul-2007::14:33:49 ===
 Supervisor: {local,tcp_client_sup}
 Context: child_terminated
 Reason: killed
 Offender: [{pid,<0.77.0>},
 {name,undefined},
 {mfa,{tcp_echo_fsm,start_link,[]}},
 {restart_type,temporary},
 {shutdown,2000},
 {child_type,worker}]

注意,如果你建立很多连接来对该server进行压力测试,在打开的文件描述符达到系统设置的极限值之后,listener进程可能会在接收新的连接的时候失败。这种情况下你会看到如下错误:

 "too many open files"

如果你是运行在Linux/UNIX上,google一个解决方案吧(最终归结为通过设置“ulimit -n …”选项来增加每个进程的限制)。

Conclusion

OTP提供了构建非阻塞的TCP服务器所需要的东西。该教程展示了如何使用标准OTP行为创建一个带有流控制的简单的TCPserver。作为一个练习,读者应该尝试将通用的非阻塞TCP服务器功能抽象为一个独立的行为。



分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics