I am trying to implement an Erlang Twitter-like application implemented on the data_actor module. It allows users to register, tweet, fetch all their tweets, subscribe to the tweets of other users, and get their timeline which includes both their tweets and those of users they are subscribed to. The application has a the data_actor loop as the main server and the worker_process loop for distributed servers. On user registration, the data_actor loop (main server) assigns each user a worker server (which uses the worker_process loop).
In addition, I have a server module which is just the interface to the application.
I have written 7 test cases but the last test (subscription_test) fails with the following error. I have looked at a few suggested solutions to this timeout test. Some suggest skipping test on timeout. Firstly, I have not included a timeout for these tests. Secondly, I want all the tests to be successful. It appears I have a logical error. Being new to Erlang, I can hardly point out what is amiss.
Eshell V9.0.4
(Erlang_Project_twitter@GAKUO)1> data_actor:test().
data_actor: subscription_test...*timed out*
undefined
=======================================================
Failed: 0. Skipped: 0. Passed: 6.
One or more tests were cancelled.
error
(Erlang_Project_twitter@GAKUO)2>
Below is the data_actor module with the application implementation logic and the 7 tests:
%% This is a simple implementation of the project, using one centralized server.
%%
%% It will create one "server" actor that contains all internal state (users,
%% their subscriptions, and their tweets).
%%
%% This implementation is provided with unit tests, however, these tests are
%% neither complete nor implementation independent, so be careful when reusing
%% them.
-module(data_actor).
-include_lib("eunit/include/eunit.hrl").
%%
%% Exported Functions
%%
-export([initialize/0,
% internal actors
data_actor/1,worker_process/1]).
%%
%% API Functions
%%
% Start server.
% This returns the pid of the server, but you can also use the name "data_actor"
% to refer to it.
initialize() ->
ServerPid = spawn_link(?MODULE, data_actor, [[]]),
register(data_actor, ServerPid),
ServerPid.
% The data actor works like a small database and encapsulates all state of this
% simple implementation.
data_actor(Data) ->
receive
{Sender, register_user} ->
{NewData, NewUserId} = add_new_user(Data),
Sender ! {self(), registered_user, NewUserId},
data_actor(NewData);
{Sender, get_timeline, UserId, Page} ->
{user, UserId, Worker_pid, _Tweets, _Subscriptions} = lists:nth(UserId + 1, Data),
Worker_pid ! {Sender, get_timeline, UserId, Page},
data_actor(Data);
{Sender, get_tweets, UserId, Page} ->
{user, UserId, Worker_pid, _Tweets, _Subscriptions} = lists:nth(UserId + 1, Data),
Worker_pid ! {Sender, get_tweets, UserId, Page},
data_actor(Data);
{Sender, tweet, UserId, Tweet} ->
{user, UserId, Worker_pid, _Tweets, _Subscriptions} = lists:nth(UserId + 1, Data),
Worker_pid ! {Sender, tweet, UserId, Tweet},
data_actor(Data);
{Sender, subscribe, UserId, UserIdToSubscribeTo} ->
{user, UserId, Worker_pid, _Tweets, _Subscriptions} = lists:nth(UserId + 1, Data),
{user, UserId, SubscribedToWorker_pid, _Tweets, _Subscriptions} = lists:nth(UserId + 1, Data),
Worker_pid ! {Sender, subscribe, UserId, UserIdToSubscribeTo,SubscribedToWorker_pid},
data_actor(Data)
end.
%%% data actor internal processes
add_new_user(Data) ->
%create new user user and assign user id
NewUserId = length(Data),
% start a worker process for the newly registred user
Worker_pid = spawn_link(?MODULE,worker_process,[[{user, NewUserId, [], sets:new()}]]),
%add the worker_pid to the data list
NewData = Data ++ [{user, NewUserId,Worker_pid, [], sets:new()}],
{NewData, NewUserId}.
%%% worker actor
worker_process(Data)->
receive
{Sender, get_timeline, UserId, Page} ->
Sender ! {self(), timeline, UserId, Page, timeline(Data, UserId, Page)},
worker_process(Data);
{Sender, get_tweets, UserId, Page} ->
Sender ! {self(), tweets, UserId, Page, tweets(Data, UserId, Page)},
worker_process(Data);
{Sender, tweet, UserId, Tweet} ->
{NewData, Timestamp} = tweet(Data, UserId, Tweet),
Sender ! {self(), tweet_accepted, UserId, Timestamp},
worker_process(NewData);
{Sender, subscribe, UserId, UserIdToSubscribeTo, SubscribedToWorker_pid} ->
NewData = subscribe_to_user(Data, UserId, UserIdToSubscribeTo,SubscribedToWorker_pid),
Sender ! {self(), subscribed, UserId, UserIdToSubscribeTo},
worker_process(NewData)
end.
%%
%% worker actor internal Functions
%%
timeline(Data, UserId, Page) ->
[ {user, UserId, Tweets, Subscriptions}] = Data,
UnsortedTweetsForTimeLine =
lists:foldl(fun(Worker_pid, AllTweets) ->
Worker_pid ! {self(), get_tweets, UserId, Page},
receive
{_ResponsePid, tweets, UserId, Page, Subscribed_Tweets} ->Subscribed_Tweets
end,
AllTweets ++ Subscribed_Tweets
end,
Tweets,
sets:to_list(Subscriptions)),
SortedTweets = lists:reverse(lists:keysort(3, UnsortedTweetsForTimeLine)),
lists:sublist(SortedTweets, 10).
tweets(Data,_UserId, _Page) ->
[{user, _UserId, Tweets, _Subscriptions}]= Data,
Tweets.
tweet(Data, UserId, Tweet) ->
[ {user, UserId, Tweets, Subscriptions}] = Data,
Timestamp = erlang:timestamp(),
NewUser = [{user, UserId, Tweets ++ [{tweet, UserId, Timestamp, Tweet}], Subscriptions}],
{ NewUser, Timestamp}.
subscribe_to_user(Data, UserId, _UserIdToSubscribeTo,SubscribedToWorker_pid) ->
[{user, UserId,Tweets, Subscriptions}] = Data,
NewUser = [{user, UserId, Tweets, sets:add_element(SubscribedToWorker_pid, Subscriptions)}],
NewUser.
%%
%% Test Functions
%%
%% These tests are for this specific implementation. They are a partial
%% definition of the semantics of the provided interface but also make certain
%% assumptions of its implementation. Thus, they need to be reused with care.
%%
initialization_test() ->
catch unregister(data_actor),
initialize().
register_user_test() ->
ServerPid = initialization_test(),
% We assume here that everything is sequential, and we have simple
% incremental ids
?assertMatch({0, _Pid1}, server:register_user(ServerPid)),
?assertMatch({1, _Pid2}, server:register_user(ServerPid)),
?assertMatch({2, _Pid3}, server:register_user(ServerPid)),
?assertMatch({3, _Pid4}, server:register_user(ServerPid)).
init_for_test() ->
ServerPid = initialization_test(),
{0, Pid1} = server:register_user(ServerPid),
{1, Pid2} = server:register_user(ServerPid),
{2, Pid3} = server:register_user(ServerPid),
{3, Pid4} = server:register_user(ServerPid),
[Pid1, Pid2, Pid3, Pid4].
timeline_test() ->
Pids = init_for_test(),
[Pid1, Pid2 | _ ] = Pids,
?assertMatch([], server:get_timeline(Pid1, 1, 0)),
?assertMatch([], server:get_timeline(Pid2, 2, 0)).
users_tweets_test() ->
Pids = init_for_test(),
[Pid1 | _ ] = Pids,
?assertMatch([], server:get_tweets(Pid1, 1, 0)),
?assertMatch([], server:get_tweets(Pid1, 2, 0)).
tweet_test() ->
Pids = init_for_test(),
[Pid1, Pid2 | _ ] = Pids,
?assertMatch([], server:get_timeline(Pid1, 1, 0)),
?assertMatch([], server:get_timeline(Pid2, 2, 0)),
?assertMatch({_, _Secs, _MicroSecs}, server:tweet(Pid1, 1, "Tweet no. 1")),
?assertMatch([{tweet, _, _, "Tweet no. 1"}], server:get_tweets(Pid1, 1, 0)),
?assertMatch([], server:get_tweets(Pid1, 2, 0)),
?assertMatch([{tweet, _, _, "Tweet no. 1"}], server:get_timeline(Pid1, 1, 0)), % own tweets included in timeline
?assertMatch([], server:get_timeline(Pid2, 2, 0)),
Pids. % no subscription
subscription_test() ->
[_Pid1, Pid2 | _ ] = tweet_test(),
?assertMatch(ok, server:subscribe(Pid2, 2, 1)),
?assertMatch([{tweet, _, _, "Tweet no. 1"}], server:get_timeline(Pid2, 2, 0)), % now there is a subscription
?assertMatch({_, _Secs, _MicroSecs}, server:tweet(Pid2, 2, "Tweet no. 2")),
?assertMatch([{tweet, _, _, "Tweet no. 2"},
{tweet, _, _, "Tweet no. 1"}],
server:get_timeline(Pid2, 2, 0)),
done.
Below is the server module which is the interface to my application:
%% This module provides the protocol that is used to interact with an
%% implementation of a microblogging service.
%%
%% The interface is design to be synchrounous: it waits for the reply of the
%% system.
%%
%% This module defines the public API that is supposed to be used for
%% experiments. The semantics of the API here should remain unchanged.
-module(server).
-export([register_user/1,
subscribe/3,
get_timeline/3,
get_tweets/3,
tweet/3]).
%%
%% Server API
%%
% Register a new user. Returns its id and a pid that should be used for
% subsequent requests by this client.
-spec register_user(pid()) -> {integer(), pid()}.
register_user(ServerPid) ->
ServerPid ! {self(), register_user},
receive
{ResponsePid, registered_user, UserId} -> {UserId, ResponsePid}
end.
% Subscribe/follow another user.
-spec subscribe(pid(), integer(), integer()) -> ok.
subscribe(ServerPid, UserId, UserIdToSubscribeTo) ->
ServerPid ! {self(), subscribe, UserId, UserIdToSubscribeTo},
receive
{_ResponsePid, subscribed, UserId, UserIdToSubscribeTo} -> ok
end.
% Request a page of the timeline of a particular user.
% Request results can be 'paginated' to reduce the amount of data to be sent in
% a single response. This is up to the server.
-spec get_timeline(pid(), integer(), integer()) -> [{tweet, integer(), erlang:timestamp(), string()}].
get_timeline(ServerPid, UserId, Page) ->
ServerPid ! {self(), get_timeline, UserId, Page},
receive
{_ResponsePid, timeline, UserId, Page, Timeline} ->
Timeline
end.
% Request a page of tweets of a particular user.
% Request results can be 'paginated' to reduce the amount of data to be sent in
% a single response. This is up to the server.
-spec get_tweets(pid(), integer(), integer()) -> [{tweet, integer(), erlang:timestamp(), string()}].
get_tweets(ServerPid, UserId, Page) ->
ServerPid ! {self(), get_tweets, UserId, Page},
receive
{_ResponsePid, tweets, UserId, Page, Tweets} ->
Tweets
end.
% Submit a tweet for a user.
% (Authorization/security are not regarded in any way.)
-spec tweet(pid(), integer(), string()) -> erlang:timestamp().
tweet(ServerPid, UserId, Tweet) ->
ServerPid ! {self(), tweet, UserId, Tweet},
receive
{_ResponsePid, tweet_accepted, UserId, Timestamp} ->
Timestamp
end.
Aucun commentaire:
Enregistrer un commentaire