193 lines
		
	
	
	
		
			6 KiB
		
	
	
	
		
			Erlang
		
	
	
	
	
	
			
		
		
	
	
			193 lines
		
	
	
	
		
			6 KiB
		
	
	
	
		
			Erlang
		
	
	
	
	
	
| -module(stomp_worker).
 | |
| -behaviour(gen_server).
 | |
| 
 | |
| %% API.
 | |
| -export([start_link/0]).
 | |
| 
 | |
| %% gen_server.
 | |
| -export([init/1]).
 | |
| -export([handle_call/3]).
 | |
| -export([handle_cast/2]).
 | |
| -export([handle_info/2]).
 | |
| -export([terminate/2]).
 | |
| -export([code_change/3]).
 | |
| 
 | |
| %% Testing
 | |
| -compile(export_all).
 | |
| 
 | |
| -include("stomp.hrl").
 | |
| 
 | |
| %% State of a stomp_worker
 | |
| -record(state, {connection    :: port(),
 | |
|                 next_sub      :: sub_id(),
 | |
|                 subscriptions :: #{ destination() => sub_id() },
 | |
|                 subscribers   :: #{ destination() => pid() }
 | |
|                }).
 | |
| -type state() :: #state{}.
 | |
| 
 | |
| %% API implementation
 | |
| 
 | |
| -spec start_link() -> {ok, pid()}.
 | |
| start_link() ->
 | |
|     {ok, Pid} = gen_server:start_link(?MODULE, [], []),
 | |
|     register(?MODULE, Pid),
 | |
|     {ok, Pid}.
 | |
| 
 | |
| %% gen_server implementation
 | |
| 
 | |
| -spec init(any()) -> {ok, state()}.
 | |
| init(_Args) ->
 | |
|     %% Fetch configuration from app config
 | |
|     {ok, Host} = application:get_env(stomp, host),
 | |
|     Port = application:get_env(stomp, port, 61613),
 | |
|     Login = application:get_env(stomp, login),
 | |
|     Pass = application:get_env(stomp, passcode),
 | |
| 
 | |
|     %% Catch exit signals from linked processes (subscribers dying)
 | |
|     process_flag(trap_exit, true),
 | |
| 
 | |
|     %% Establish connection
 | |
|     {ok, Conn} = connect(Host, Port, Login, Pass),
 | |
| 
 | |
|     {ok, #state{connection = Conn,
 | |
|                 next_sub   = 0,
 | |
|                 subscriptions = #{},
 | |
|                 subscribers = #{}}}.
 | |
| 
 | |
| %% Handle subscription calls
 | |
| handle_call({subscribe, Dest, Ack}, From, State) ->
 | |
|     %% Subscribe to new destination
 | |
|     SubId = State#state.next_sub,
 | |
|     ok = subscribe(State#state.connection, SubId, Dest, Ack),
 | |
| 
 | |
|     %% Add subscription and subscriber to state
 | |
|     Subscriptions = maps:put(SubId, Dest, State#state.subscriptions),
 | |
|     Subscribers = maps:put(SubId, From, State#state.subscribers),
 | |
|     NextSub = SubId + 1,
 | |
|     NewState = State#state{subscriptions = Subscriptions,
 | |
|                            subscribers = Subscribers,
 | |
|                            next_sub = NextSub },
 | |
| 
 | |
|     {reply, {ok, SubId}, NewState};
 | |
| handle_call(_Req, _From, State) ->
 | |
|     {reply, ignored, State}.
 | |
| 
 | |
| handle_info({tcp, Conn, Frame}, State) when Conn =:= State#state.connection ->
 | |
|     handle_frame(Frame, State);
 | |
| handle_info(_Msg, State) ->
 | |
|     {noreply, State}.
 | |
| 
 | |
| %% Unused gen_server callbacks
 | |
| 
 | |
| handle_cast(_Msg, State) ->
 | |
|     {noreply, State}.
 | |
| 
 | |
| terminate(_Reason, _State) ->
 | |
|     ok.
 | |
| 
 | |
| code_change(_OldVsn, State, _Extra) ->
 | |
|     {ok, State}.
 | |
| 
 | |
| %% Private functions
 | |
| 
 | |
| -spec connect(list(), integer(), any(), any()) -> {ok, port()}.
 | |
| connect(Host, Port, Login, Pass) ->
 | |
|     %% STOMP CONNECT frame
 | |
|     Connect = connect_frame(Host, Login, Pass),
 | |
| 
 | |
|     %% TODO: Configurable buffer size
 | |
|     %% Frames larger than the user-level buffer will be truncated, so it should
 | |
|     %% never be smaller than the largest expected messages.
 | |
|     {ok, Socket} = gen_tcp:connect(Host, Port, [binary,
 | |
|                                                 {packet, line},
 | |
|                                                 {line_delimiter, $\0},
 | |
|                                                 {buffer, 262144}]),
 | |
| 
 | |
|     ok = gen_tcp:send(Socket, Connect),
 | |
|     {ok, Socket}.
 | |
| 
 | |
| -spec subscribe(port(), sub_id(), destination(), ack_mode()) -> ok.
 | |
| subscribe(Socket, Id, Queue, Ack) ->
 | |
|     {ok, SubscribeFrame} = subscribe_frame(Id, Queue, Ack),
 | |
|     gen_tcp:send(Socket, SubscribeFrame).
 | |
| 
 | |
| %%% Parsing STOMP frames
 | |
| 
 | |
| handle_frame(<<"MESSAGE", "\n", _Frame/binary>>, State) ->
 | |
|     {noreply, State};
 | |
| handle_frame(Frame, State) ->
 | |
|     io:format("Received unknown frame ~p", [Frame]),
 | |
|     {noreply, State}.
 | |
| 
 | |
| 
 | |
| %% Parse out headers into a map
 | |
| -spec parse_headers(binary()) -> headers().
 | |
| parse_headers(HeadersBin) ->
 | |
|     Headers = binary:split(HeadersBin, <<"\n">>, [global]),
 | |
|     ToPairs = fun(H, M) -> [K,V | []] = binary:split(H, <<":">>),
 | |
|                            maps:put(K, V, M)
 | |
|               end,
 | |
|     {ok, lists:mapfoldl(ToPairs, #{}, Headers)}.
 | |
| 
 | |
| %%% Making STOMP protocol frames
 | |
| 
 | |
| %% Format a header
 | |
| -spec format_header({binary(), binary()}) -> binary().
 | |
| format_header({Key, Val}) ->
 | |
|     <<Key/binary, ":", Val/binary, "\n">>.
 | |
| 
 | |
| %% Build a single STOMP frame
 | |
| -spec make_frame(binary(),
 | |
|                  headers(),
 | |
|                  binary())
 | |
|                 -> {ok, iolist()}.
 | |
| make_frame(Command, HeaderMap, Body) ->
 | |
|     Headers = lists:map(fun format_header/1, maps:to_list(HeaderMap)),
 | |
|     Frame = [Command, <<"\n">>, Headers, <<"\n">>, Body, <<0>>],
 | |
|     {ok, Frame}.
 | |
| 
 | |
| %%% Default frames
 | |
| 
 | |
| -spec connect_frame(list(), any(), any()) -> iolist().
 | |
| connect_frame(Host, {ok, Login}, {ok, Pass}) ->
 | |
|     make_frame(<<"CONNECT">>,
 | |
|                #{<<"accept-version">> => <<"1.2">>,
 | |
|                  <<"host">>           => Host,
 | |
|                  <<"login">>          => Login,
 | |
|                  <<"passcode">>       => Pass,
 | |
|                  <<"heart-beat">>     => <<"0,5000">>},
 | |
|                []);
 | |
| connect_frame(Host, _Login, _Pass) ->
 | |
|     make_frame(<<"CONNECT">>,
 | |
|                #{<<"accept-version">> => <<"1.2">>,
 | |
|                  <<"host">>           => Host,
 | |
|                  %% Expect a server heartbeat every 5 seconds, let the server
 | |
|                  %% expect one every 10. We don't actually check this and just
 | |
|                  %% echo server heartbeats.
 | |
|                  %% TODO: For now the server is told not to expect replies due to
 | |
|                  %% a weird behaviour.
 | |
|                  <<"heart-beat">>     => <<"0,5000">>},
 | |
|                []).
 | |
| 
 | |
| 
 | |
| -spec subscribe_frame(sub_id(), destination(), ack_mode()) -> iolist().
 | |
| subscribe_frame(Id, Queue, Ack) ->
 | |
|     make_frame(<<"SUBSCRIBE">>,
 | |
|                #{<<"id">>          => integer_to_binary(Id),
 | |
|                  <<"destination">> => Queue,
 | |
|                  <<"ack">>         => ack_mode_to_binary(Ack)},
 | |
|                []).
 | |
| 
 | |
| -spec ack_mode_to_binary(ack_mode()) -> binary().
 | |
| ack_mode_to_binary(AckMode) ->
 | |
|     case AckMode of
 | |
|         auto              -> <<"auto">>;
 | |
|         client            -> <<"client">>;
 | |
|         client_individual -> <<"client-individual">>
 | |
|     end.
 | |
| 
 | |
| %% -spec ack_frame(binary()) -> iolist().
 | |
| %% ack_frame(MessageID) ->
 | |
| %%     make_frame(<<"ACK">>,
 | |
| %%                [{"id", MessageID}],
 | |
| %%                []).
 |