Commit 9e6903ee authored by Aggelos Giantsios's avatar Aggelos Giantsios

Reorder functions & Change the type sig of init

parent cf73923e
......@@ -3,7 +3,10 @@
-- orbit-int master (controlling orbit computation)
--
module MasterWorker( -- Master
orbit
GenClos
, HostInfo (..)
, MaybeHosts(..)
, orbit
, get_gens, get_master, get_workers, get_spawn_img_comp
, get_global_table_size, get_idle_timeout
, set_idle_timeout, clear_spawn_img_comp
......@@ -73,151 +76,6 @@ type ParConf =
type WorkerStats = [(String, String)]
-- DATA
-- Static Machine Configuration:
-- {Gs, %list of generators
-- Master, %pid of master process
-- Workers, %list of Worker
-- GlobalTableSize, %size of global hash table
-- IdleTimeout, %milliseconds this worker idles before sending 'done'
-- SpawnImgComp} %true iff this worker spawns image computations
--
-- Worker:
-- {Pid, %pid of worker process
-- TableOffset, %offset (= index 0) of local table into global table
-- TableSize} %size of local hash table
--
-- Host:
-- {Node, %atom naming Erlang node
-- Procs, %number of processors
-- TableSize, %size of hash table per processor
-- IdleTimeout} %milliseconds a processor idles before sending 'done'
--
-- Statistics:
-- List of pairs where the first component is an atom, the second
-- some data. Part of the data is the fill frequency of the table
-- (a list whose ith element indicates frequency of filling degree i).
-- MESSAGES
-- Master -> Worker: {init, StaticMachConf}
--
-- Master/Worker -> Worker: {vertex, X, Slot, K}
-- %X is vertex
-- %Slot is slot of X on target worker
-- %K is atomic credit shipped with vertex
--
-- Worker -> Master: {done, Cs}
-- %Cs is non-zero credit (rep as list of ints)
--
-- Master -> Worker: {dump}
--
-- Worker -> Master: {result, Xs, Stats}
-- %Xs is list of found orbit vertices
-- %Stats is statistics about worker's table
-- compute orbit of elements in list Xs under list of generators Gs;
-- the argument Hosts is either an integer N, a triple {P, N, T}, or
-- a non-empty list [{H, P, N, T} | ...] of quadruples:
-- * N: run the sequential algorithm with table size N
-- * {P, N, T, S}: run the parallel algorithm on P processors
-- each with table size N, idle timeout T and
-- spawn image computation flag S;
-- * [{H, P, N, T, S} | ...]: run the distributed algorithm on the list of
-- hosts, where each quintuple {H, P, N, T, S}
-- specifies
-- * host name H (ie. name of Erlang node),
-- * number of processors P on H,
-- * table size N (per processor),
-- * idle timeout T, and
-- * spawn image computation flag S.
-- The function returns a pair consisting of the computed orbit and
-- a list of statistics, the first element of which reports overall statistics,
-- and all remaining elements report statistics of some worker.
orbit :: GenClos -> [Vertex] -> MaybeHosts -> Process ([Vertex], [MasterStats])
orbit (GenClos (_, _, gs)) xs (Seq tablesize) =
return $ Sq.orbit gs xs tablesize
orbit gs xs (Par hostInfo) = par_orbit gs xs hostInfo
par_orbit :: GenClos -> [Vertex] -> HostInfo
-> Process ([Vertex], [MasterStats])
par_orbit gs xs hosts = do
-- spawn workers on Hosts
(workers, globTabSize) <- start_workers hosts
self <- getSelfPid
let -- assemble StaticMachConf and distribute to Workers
staticMachConf = mk_static_mach_conf gs self workers globTabSize
mapM_ (\(pid, _, _) -> send pid ("init", staticMachConf)) workers
let -- start wall clock timer
startTime = now
-- distribute initial vertices to workers
credit <- distribute_vertices staticMachConf one xs
-- collect credit handed back by idle workers
collect_credit credit
let -- measure elapsed time (in milliseconds)
elapsedTime = now - startTime
-- tell all workers to dump their tables
mapM_ (\(pid, _, _) -> send pid "dump") workers
-- collect results from all workers and return them
collect_orbit elapsedTime (length workers)
-- start_workers starts worker processes depending on the input Hosts:
-- * if Hosts is a quadruple {P, _, _, _} then P processes are forked on the
-- executing Erlang node;
-- * if Hosts is a non-empty list {H1, P1, _, _, _}, {H2, P2, _, _, _}, ...
-- then P1 processes are forked on Erlang node H1, P2 processes on node H2,
-- and so on.
-- The function returns a pair {Workers, GlobalTableSize}, where
-- * GlobalTableSize is the total number of slots of the global hash table, and
-- * Workers is a list of Worker, sorted wrt. TableOffset in ascending order.
start_workers :: HostInfo -> Process ([(ProcessId, Int, Int)], Int)
start_workers (JustOne host) = do
(workers, globalTableSize) <- do_start_shm host ([], 0)
return (reverse workers, globalTableSize)
start_workers (Many hosts) = do
(workers, globalTableSize) <- do_start_dist hosts ([], 0)
return (reverse workers, globalTableSize)
do_start_shm (0, _, _, _) acc = return acc
do_start_shm (m, tabSize, tmOut, spawnImgComp) (workers, gTabSize) = do
selfNode <- getSelfNode
pid <- spawnLink selfNode ($(mkClosure 'init) (tabSize, tmOut, spawnImgComp))
do_start_shm (m - 1, tabSize, tmOut, spawnImgComp)
((pid, gTabSize, tabSize) : workers, gTabSize + tabSize)
do_start_dist [] acc = return acc
do_start_dist ((_, 0, _, _, _) : hosts) acc = do_start_dist hosts acc
do_start_dist ((node,m,tabSize,tmOut,spawnImgComp) : hosts) (workers,gTabSize) = do
pid <- spawnLink node ($(mkClosure 'init) (tabSize, tmOut, spawnImgComp))
do_start_dist ((node, m - 1, tabSize, tmOut, spawnImgComp) : hosts)
((pid, gTabSize, tabSize) : workers, gTabSize + tabSize)
-- collect_credit collects leftover credit from idle workers until
-- the credit adds up to 1.
collect_credit :: Credit -> Process ()
collect_credit crdt
| is_one crdt = return ()
| otherwise = receiveWait [
match $ \("done", workersCredit) ->
collect_credit $ credit workersCredit crdt
]
-- collect_orbit collects partial orbits and stats from N workers.
collect_orbit :: Int -> Int -> Process ([Vertex], [MasterStats])
collect_orbit elapsedTime n = do
(orbit, stats) <- do_collect_orbit n [] []
return (concat orbit, master_stats elapsedTime stats : stats)
do_collect_orbit :: Int -> [[Vertex]] -> [WorkerStats]
-> Process ([[Vertex]], [WorkerStats])
do_collect_orbit 0 partOrbits workerStats = return (partOrbits, workerStats)
do_collect_orbit n partOrbits workerStats = do
receiveWait [
match $ \("result", partOrbit, workerStat) ->
do_collect_orbit (n - 1) (partOrbit : partOrbits) (workerStat : workerStats)
]
-------------------------------------------------------------------------------
-- auxiliary functions
......@@ -292,8 +150,8 @@ defaultCt = Ct { verts_recvd = 0
}
-- initialise worker
init :: Int -> Int -> Bool -> Process ()
init localTableSize idleTimeout spawnImgComp =
init :: (Int, Int, Bool) -> Process ()
init (localTableSize, idleTimeout, spawnImgComp) =
receiveWait [
match $ \("init", staticMachConf0) -> do
let staticMachConf1 = set_idle_timeout staticMachConf0 idleTimeout
......@@ -303,8 +161,6 @@ init localTableSize idleTimeout spawnImgComp =
vertex_server staticMachConf zero (new localTableSize) defaultCt
]
remotable ['init]
-- main worker loop: server handling vertex messages;
-- StaticMachConf: info about machine configuration
-- Credit: credit currently held by the server,
......@@ -493,3 +349,159 @@ tail_idle_from_stat stat =
max_idle_from_stat :: WorkerStats -> Int
max_idle_from_stat stat =
read (fromJust ("max_idle_time" `lookup` stat)) :: Int
remotable ['init]
--
-- orbit-int master (controlling orbit computation)
--
-- DATA
-- Static Machine Configuration:
-- {Gs, %list of generators
-- Master, %pid of master process
-- Workers, %list of Worker
-- GlobalTableSize, %size of global hash table
-- IdleTimeout, %milliseconds this worker idles before sending 'done'
-- SpawnImgComp} %true iff this worker spawns image computations
--
-- Worker:
-- {Pid, %pid of worker process
-- TableOffset, %offset (= index 0) of local table into global table
-- TableSize} %size of local hash table
--
-- Host:
-- {Node, %atom naming Erlang node
-- Procs, %number of processors
-- TableSize, %size of hash table per processor
-- IdleTimeout} %milliseconds a processor idles before sending 'done'
--
-- Statistics:
-- List of pairs where the first component is an atom, the second
-- some data. Part of the data is the fill frequency of the table
-- (a list whose ith element indicates frequency of filling degree i).
-- MESSAGES
-- Master -> Worker: {init, StaticMachConf}
--
-- Master/Worker -> Worker: {vertex, X, Slot, K}
-- %X is vertex
-- %Slot is slot of X on target worker
-- %K is atomic credit shipped with vertex
--
-- Worker -> Master: {done, Cs}
-- %Cs is non-zero credit (rep as list of ints)
--
-- Master -> Worker: {dump}
--
-- Worker -> Master: {result, Xs, Stats}
-- %Xs is list of found orbit vertices
-- %Stats is statistics about worker's table
-- compute orbit of elements in list Xs under list of generators Gs;
-- the argument Hosts is either an integer N, a triple {P, N, T}, or
-- a non-empty list [{H, P, N, T} | ...] of quadruples:
-- * N: run the sequential algorithm with table size N
-- * {P, N, T, S}: run the parallel algorithm on P processors
-- each with table size N, idle timeout T and
-- spawn image computation flag S;
-- * [{H, P, N, T, S} | ...]: run the distributed algorithm on the list of
-- hosts, where each quintuple {H, P, N, T, S}
-- specifies
-- * host name H (ie. name of Erlang node),
-- * number of processors P on H,
-- * table size N (per processor),
-- * idle timeout T, and
-- * spawn image computation flag S.
-- The function returns a pair consisting of the computed orbit and
-- a list of statistics, the first element of which reports overall statistics,
-- and all remaining elements report statistics of some worker.
orbit :: GenClos -> [Vertex] -> MaybeHosts -> Process ([Vertex], [MasterStats])
orbit (GenClos (_, _, gs)) xs (Seq tablesize) =
return $ Sq.orbit gs xs tablesize
orbit gs xs (Par hostInfo) = par_orbit gs xs hostInfo
par_orbit :: GenClos -> [Vertex] -> HostInfo
-> Process ([Vertex], [MasterStats])
par_orbit gs xs hosts = do
-- spawn workers on Hosts
(workers, globTabSize) <- start_workers hosts
self <- getSelfPid
let -- assemble StaticMachConf and distribute to Workers
staticMachConf = mk_static_mach_conf gs self workers globTabSize
mapM_ (\(pid, _, _) -> send pid ("init", staticMachConf)) workers
let -- start wall clock timer
startTime = now
-- distribute initial vertices to workers
credit <- distribute_vertices staticMachConf one xs
-- collect credit handed back by idle workers
collect_credit credit
let -- measure elapsed time (in milliseconds)
elapsedTime = now - startTime
-- tell all workers to dump their tables
mapM_ (\(pid, _, _) -> send pid "dump") workers
-- collect results from all workers and return them
collect_orbit elapsedTime (length workers)
-- start_workers starts worker processes depending on the input Hosts:
-- * if Hosts is a quadruple {P, _, _, _} then P processes are forked on the
-- executing Erlang node;
-- * if Hosts is a non-empty list {H1, P1, _, _, _}, {H2, P2, _, _, _}, ...
-- then P1 processes are forked on Erlang node H1, P2 processes on node H2,
-- and so on.
-- The function returns a pair {Workers, GlobalTableSize}, where
-- * GlobalTableSize is the total number of slots of the global hash table, and
-- * Workers is a list of Worker, sorted wrt. TableOffset in ascending order.
start_workers :: HostInfo -> Process ([(ProcessId, Int, Int)], Int)
start_workers (JustOne host) = do
(workers, globalTableSize) <- do_start_shm host ([], 0)
return (reverse workers, globalTableSize)
start_workers (Many hosts) = do
(workers, globalTableSize) <- do_start_dist hosts ([], 0)
return (reverse workers, globalTableSize)
do_start_shm :: (Int, Int, Int, Bool) -> ([(ProcessId, Int, Int)], Int)
-> Process ([(ProcessId, Int, Int)], Int)
do_start_shm (0, _, _, _) acc = return acc
do_start_shm (m, tabSize, tmOut, spawnImgComp) (workers, gTabSize) = do
selfNode <- getSelfNode
pid <- spawnLink selfNode ($(mkClosure 'init) (tabSize, tmOut, spawnImgComp))
do_start_shm (m - 1, tabSize, tmOut, spawnImgComp)
((pid, gTabSize, tabSize) : workers, gTabSize + tabSize)
do_start_dist :: [(NodeId, Int, Int, Int, Bool)] -> ([(ProcessId, Int, Int)], Int)
-> Process ([(ProcessId, Int, Int)], Int)
do_start_dist [] acc = return acc
do_start_dist ((_, 0, _, _, _) : hosts) acc = do_start_dist hosts acc
do_start_dist ((node,m,tabSize,tmOut,spawnImgComp) : hosts) (workers,gTabSize) = do
pid <- spawnLink node ($(mkClosure 'init) (tabSize, tmOut, spawnImgComp))
do_start_dist ((node, m - 1, tabSize, tmOut, spawnImgComp) : hosts)
((pid, gTabSize, tabSize) : workers, gTabSize + tabSize)
-- collect_credit collects leftover credit from idle workers until
-- the credit adds up to 1.
collect_credit :: Credit -> Process ()
collect_credit crdt
| is_one crdt = return ()
| otherwise = receiveWait [
match $ \("done", workersCredit) ->
collect_credit $ credit workersCredit crdt
]
-- collect_orbit collects partial orbits and stats from N workers.
collect_orbit :: Int -> Int -> Process ([Vertex], [MasterStats])
collect_orbit elapsedTime n = do
(orbit, stats) <- do_collect_orbit n [] []
return (concat orbit, master_stats elapsedTime stats : stats)
do_collect_orbit :: Int -> [[Vertex]] -> [WorkerStats]
-> Process ([[Vertex]], [WorkerStats])
do_collect_orbit 0 partOrbits workerStats = return (partOrbits, workerStats)
do_collect_orbit n partOrbits workerStats = do
receiveWait [
match $ \("result", partOrbit, workerStat) ->
do_collect_orbit (n - 1) (partOrbit : partOrbits) (workerStat : workerStats)
]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment