Commit cf73923e authored by Yiannis Tsiouris's avatar Yiannis Tsiouris

Implement rest of Master functions + Serializeable stuff

parent c9b885e5
{-# LANGUAGE TemplateHaskell, DeriveDataTypeable #-}
-- --
-- orbit-int master (controlling orbit computation) -- orbit-int master (controlling orbit computation)
-- --
module MasterWorker where module MasterWorker( -- Master
{- orbit
module Worker( init , get_gens, get_master, get_workers, get_spawn_img_comp
, get_global_table_size, get_idle_timeout
, set_idle_timeout, clear_spawn_img_comp
-- Worker
, init
, distribute_vertices , distribute_vertices
, send_image , send_image
, verts_recvd_from_stat , verts_recvd_from_stat
...@@ -14,25 +19,30 @@ module Worker( init ...@@ -14,25 +19,30 @@ module Worker( init
, max_idle_from_stat , max_idle_from_stat
, WorkerStats , WorkerStats
) where ) where
-}
import Control.Distributed.Process (Process, ProcessId, NodeId, import Control.Distributed.Process
getSelfNode, match, import Control.Distributed.Process.Closure
receiveTimeout, receiveWait, import Data.Binary
send, spawnLocal)
import Data.Hashable (hash) import Data.Hashable (hash)
import Data.Maybe (fromJust) import Data.Maybe (fromJust)
import Data.Typeable
import Prelude hiding (init)
import Credit (ACredit, Credit, credit, import Credit
credit_atomic, debit_atomic,
debit_atomic_nz, is_one, is_zero,
zero)
import qualified Sequential as Sq (Generator, orbit) import qualified Sequential as Sq (Generator, orbit)
import Table (Freq, Vertex, VTable, import Table
freq_from_stat, freq_to_stat, import Utils (dispatcher, now)
get_freq, insert, is_member,
new, sum_freqs, to_list) -- Trying to serialize ParConf closures...
import Utils (now) newtype GenClos = GenClos (String, Int, [Sq.Generator])
deriving (Typeable)
instance Show GenClos where
showsPrec p (GenClos (name, _, _)) = (name ++)
instance Binary GenClos where
put (GenClos (name, n, _)) = put (name, n)
get = get >>= \(name, n) -> return $ GenClos (name, n, dispatcher name n)
-- counters/timers record -- counters/timers record
data Ct = Ct { verts_recvd :: Int -- #vertices received by this server so far data Ct = Ct { verts_recvd :: Int -- #vertices received by this server so far
...@@ -59,7 +69,7 @@ data HostInfo = JustOne (Int, -- Number of processes ...@@ -59,7 +69,7 @@ data HostInfo = JustOne (Int, -- Number of processes
Int, -- Idle timeout Int, -- Idle timeout
Bool)] -- Spawn image comp Bool)] -- Spawn image comp
type ParConf = type ParConf =
([Sq.Generator], ProcessId, [(ProcessId, Int, Int)], Int, Int, Bool) (GenClos, ProcessId, [(ProcessId, Int, Int)], Int, Int, Bool)
type WorkerStats = [(String, String)] type WorkerStats = [(String, String)]
...@@ -125,22 +135,70 @@ type WorkerStats = [(String, String)] ...@@ -125,22 +135,70 @@ type WorkerStats = [(String, String)]
-- The function returns a pair consisting of the computed orbit and -- The function returns a pair consisting of the computed orbit and
-- a list of statistics, the first element of which reports overall statistics, -- a list of statistics, the first element of which reports overall statistics,
-- and all remaining elements report statistics of some worker. -- and all remaining elements report statistics of some worker.
orbit :: [Sq.Generator] -> [Vertex] -> MaybeHosts -> ([Vertex], [MasterStats]) orbit :: GenClos -> [Vertex] -> MaybeHosts -> Process ([Vertex], [MasterStats])
orbit gs xs (Seq tablesize) = Sq.orbit gs xs tablesize orbit (GenClos (_, _, gs)) xs (Seq tablesize) =
return $ Sq.orbit gs xs tablesize
orbit gs xs (Par hostInfo) = par_orbit gs xs hostInfo orbit gs xs (Par hostInfo) = par_orbit gs xs hostInfo
-- FIXME Write the proper par_orbit par_orbit :: GenClos -> [Vertex] -> HostInfo
par_orbit :: [Sq.Generator] -> [Vertex] -> HostInfo -> Process ([Vertex], [MasterStats])
-> ([Vertex], [MasterStats]) par_orbit gs xs hosts = do
par_orbit gs xs hosts = ([42], [[("xxx", "xxx")]]) -- spawn workers on Hosts
(workers, globTabSize) <- start_workers hosts
self <- getSelfPid
let -- assemble StaticMachConf and distribute to Workers
staticMachConf = mk_static_mach_conf gs self workers globTabSize
mapM_ (\(pid, _, _) -> send pid ("init", staticMachConf)) workers
let -- start wall clock timer
startTime = now
-- distribute initial vertices to workers
credit <- distribute_vertices staticMachConf one xs
-- collect credit handed back by idle workers
collect_credit credit
let -- measure elapsed time (in milliseconds)
elapsedTime = now - startTime
-- tell all workers to dump their tables
mapM_ (\(pid, _, _) -> send pid "dump") workers
-- collect results from all workers and return them
collect_orbit elapsedTime (length workers)
-- start_workers starts worker processes depending on the input Hosts:
-- * if Hosts is a quadruple {P, _, _, _} then P processes are forked on the
-- executing Erlang node;
-- * if Hosts is a non-empty list {H1, P1, _, _, _}, {H2, P2, _, _, _}, ...
-- then P1 processes are forked on Erlang node H1, P2 processes on node H2,
-- and so on.
-- The function returns a pair {Workers, GlobalTableSize}, where
-- * GlobalTableSize is the total number of slots of the global hash table, and
-- * Workers is a list of Worker, sorted wrt. TableOffset in ascending order.
start_workers :: HostInfo -> Process ([(ProcessId, Int, Int)], Int)
start_workers (JustOne host) = do
(workers, globalTableSize) <- do_start_shm host ([], 0)
return (reverse workers, globalTableSize)
start_workers (Many hosts) = do
(workers, globalTableSize) <- do_start_dist hosts ([], 0)
return (reverse workers, globalTableSize)
do_start_shm (0, _, _, _) acc = return acc
do_start_shm (m, tabSize, tmOut, spawnImgComp) (workers, gTabSize) = do
selfNode <- getSelfNode
pid <- spawnLink selfNode ($(mkClosure 'init) (tabSize, tmOut, spawnImgComp))
do_start_shm (m - 1, tabSize, tmOut, spawnImgComp)
((pid, gTabSize, tabSize) : workers, gTabSize + tabSize)
do_start_dist [] acc = return acc
do_start_dist ((_, 0, _, _, _) : hosts) acc = do_start_dist hosts acc
do_start_dist ((node,m,tabSize,tmOut,spawnImgComp) : hosts) (workers,gTabSize) = do
pid <- spawnLink node ($(mkClosure 'init) (tabSize, tmOut, spawnImgComp))
do_start_dist ((node, m - 1, tabSize, tmOut, spawnImgComp) : hosts)
((pid, gTabSize, tabSize) : workers, gTabSize + tabSize)
-- collect_credit collects leftover credit from idle workers until -- collect_credit collects leftover credit from idle workers until
-- the credit adds up to 1. -- the credit adds up to 1.
collect_credit :: Credit -> Process () collect_credit :: Credit -> Process ()
collect_credit crdt = collect_credit crdt
case is_one crdt of | is_one crdt = return ()
True -> return () | otherwise = receiveWait [
False -> receiveWait [
match $ \("done", workersCredit) -> match $ \("done", workersCredit) ->
collect_credit $ credit workersCredit crdt collect_credit $ credit workersCredit crdt
] ]
...@@ -164,12 +222,12 @@ do_collect_orbit n partOrbits workerStats = do ...@@ -164,12 +222,12 @@ do_collect_orbit n partOrbits workerStats = do
-- auxiliary functions -- auxiliary functions
-- functions operating on the StaticMachConf -- functions operating on the StaticMachConf
mk_static_mach_conf :: [Sq.Generator] -> ProcessId -> [(ProcessId, Int, Int)] mk_static_mach_conf :: GenClos -> ProcessId -> [(ProcessId, Int, Int)] -> Int
-> Int -> ParConf -> ParConf
mk_static_mach_conf gs master workers globalTableSize = mk_static_mach_conf gs master workers globalTableSize =
(gs, master, workers, globalTableSize, 0, True) (gs, master, workers, globalTableSize, 0, True)
get_gens :: ParConf -> [Sq.Generator] get_gens :: ParConf -> GenClos
get_gens (gs, _, _, _, _, _) = gs get_gens (gs, _, _, _, _, _) = gs
get_master :: ParConf -> ProcessId get_master :: ParConf -> ProcessId
...@@ -245,9 +303,8 @@ init localTableSize idleTimeout spawnImgComp = ...@@ -245,9 +303,8 @@ init localTableSize idleTimeout spawnImgComp =
vertex_server staticMachConf zero (new localTableSize) defaultCt vertex_server staticMachConf zero (new localTableSize) defaultCt
] ]
vertex_server :: ParConf -> Credit -> VTable -> Ct -> Process () remotable ['init]
vertex_server _ _ _ _ = return ()
{-
-- main worker loop: server handling vertex messages; -- main worker loop: server handling vertex messages;
-- StaticMachConf: info about machine configuration -- StaticMachConf: info about machine configuration
-- Credit: credit currently held by the server, -- Credit: credit currently held by the server,
...@@ -288,7 +345,6 @@ vertex_server staticMachConf credit table statData = do ...@@ -288,7 +345,6 @@ vertex_server staticMachConf credit table statData = do
let newStatData = statData {credit_retd = newCreditRetd} let newStatData = statData {credit_retd = newCreditRetd}
vertex_server staticMachConf zero table newStatData vertex_server staticMachConf zero table newStatData
Just _ -> return () Just _ -> return ()
-}
-- handle_vertex checks whether vertex X is stored in Slot of Table; -- handle_vertex checks whether vertex X is stored in Slot of Table;
-- if not, it is in inserted there and the images of the generators -- if not, it is in inserted there and the images of the generators
...@@ -306,7 +362,6 @@ handle_vertex staticMachConf x slot credit table ...@@ -306,7 +362,6 @@ handle_vertex staticMachConf x slot credit table
-- return remaining credit and updated table -- return remaining credit and updated table
return (newCredit, newTable) return (newCredit, newTable)
-- return_credit sends non-zero Credit back to the master; -- return_credit sends non-zero Credit back to the master;
-- returns number of times credit has been returned so far -- returns number of times credit has been returned so far
return_credit :: ParConf -> Credit -> Int -> Process Int return_credit :: ParConf -> Credit -> Int -> Process Int
...@@ -335,16 +390,17 @@ distribute_images :: ParConf -> Vertex -> Credit -> Process Credit ...@@ -335,16 +390,17 @@ distribute_images :: ParConf -> Vertex -> Credit -> Process Credit
distribute_images staticMachConf x credit = distribute_images staticMachConf x credit =
do_distribute_images staticMachConf x credit (get_gens staticMachConf) do_distribute_images staticMachConf x credit (get_gens staticMachConf)
do_distribute_images :: ParConf -> Vertex -> Credit -> [Sq.Generator] do_distribute_images :: ParConf -> Vertex -> Credit -> GenClos
-> Process Credit -> Process Credit
do_distribute_images _ _ credit [] = return credit do_distribute_images _ _ credit (GenClos (_, _, [])) =
do_distribute_images staticMachConf x credit [g] = do return credit
do_distribute_images staticMachConf x credit (GenClos (_, _, [g])) = do
let (k, remainingCredit) = debit_atomic credit let (k, remainingCredit) = debit_atomic credit
if get_spawn_img_comp staticMachConf if get_spawn_img_comp staticMachConf
then spawnLocal (send_image staticMachConf x g k) >> return () then spawnLocal (send_image staticMachConf x g k) >> return ()
else send_image staticMachConf x g k else send_image staticMachConf x g k
return remainingCredit return remainingCredit
do_distribute_images staticMachConf x credit (g : gs) = do do_distribute_images staticMachConf x credit (GenClos (_, _, g : gs)) = do
let (k, nonZeroRemainingCredit) = debit_atomic_nz credit let (k, nonZeroRemainingCredit) = debit_atomic_nz credit
if get_spawn_img_comp staticMachConf if get_spawn_img_comp staticMachConf
then spawnLocal (send_image staticMachConf x g k) >> return () then spawnLocal (send_image staticMachConf x g k) >> return ()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment