Commit f9fba183 authored by Yiannis Tsiouris's avatar Yiannis Tsiouris

Implement rest Worker functions (not working)

parent c4320ca0
-- orbit-int master (controlling orbit computation)
-- orbit-int worker (computing vertices and holding part of hash table)
module MasterWorker where
module Worker( --init
--, distribute_vertices
--, send_image
module Worker( init
, distribute_vertices
, send_image
, verts_recvd_from_stat
, credit_retd_from_stat
, min_atomic_credit_from_stat
, init_idle_from_stat
......@@ -18,13 +17,20 @@ module Worker( --init
import Control.Distributed.Process (Process, ProcessId, NodeId,
match, receiveWait)
getSelfNode, match,
receiveTimeout, receiveWait,
send, spawnLocal)
import Data.Hashable (hash)
import Data.Maybe (fromJust)
import Credit (credit, is_one)
import Credit (ACredit, Credit, credit, credit_atomic,
debit_atomic, debit_atomic_nz,
is_one, is_zero, zero)
import qualified Sequential as Sq (Generator, orbit)
import Table (Freq, Vertex, freq_from_stat,
freq_to_stat, sum_freqs)
import Table (Freq, Vertex, VTable,
freq_from_stat, freq_to_stat,
get_freq, insert, is_member,
new, sum_freqs, to_list)
import Utils (now)
-- counters/timers record
......@@ -51,7 +57,8 @@ data HostInfo = JustOne (Int, -- Number of processes
Int, -- Table size
Int, -- Idle timeout
Bool)] -- Spawn image comp
type ParConf = ([Sq.Generator], ProcessId, [ProcessId], Int, Int, Bool)
type ParConf =
([Sq.Generator], ProcessId, [(ProcessId, Int, Int)], Int, Int, Bool)
type WorkerStats = [(String, String)]
......@@ -117,19 +124,18 @@ type WorkerStats = [(String, String)]
-- The function returns a pair consisting of the computed orbit and
-- a list of statistics, the first element of which reports overall statistics,
-- and all remaining elements report statistics of some worker.
orbit :: [Vertex -> Vertex] -> [Vertex] -> MaybeHosts
-> ([Vertex], [MasterStats])
orbit :: [Sq.Generator] -> [Vertex] -> MaybeHosts -> ([Vertex], [MasterStats])
orbit gs xs (Seq tablesize) = Sq.orbit gs xs tablesize
orbit gs xs (Par hostInfo) = par_orbit gs xs hostInfo
-- FIXME Write the proper par_orbit
par_orbit :: [Vertex -> Vertex] -> [Vertex] -> HostInfo
par_orbit :: [Sq.Generator] -> [Vertex] -> HostInfo
-> ([Vertex], [MasterStats])
par_orbit gs xs hosts = ([42], [[("xxx", "xxx")]])
-- collect_credit collects leftover credit from idle workers until
-- the credit adds up to 1.
collect_credit :: [Int] -> Process ()
collect_credit :: Credit -> Process ()
collect_credit crdt =
case is_one crdt of
True -> return ()
......@@ -157,8 +163,8 @@ do_collect_orbit n partOrbits workerStats = do
-- auxiliary functions
-- functions operating on the StaticMachConf
mk_static_mach_conf :: [Sq.Generator] -> ProcessId -> [ProcessId] -> Int
-> ParConf
mk_static_mach_conf :: [Sq.Generator] -> ProcessId -> [(ProcessId, Int, Int)]
-> Int -> ParConf
mk_static_mach_conf gs master workers globalTableSize =
(gs, master, workers, globalTableSize, 0, True)
......@@ -168,7 +174,7 @@ get_gens (gs, _, _, _, _, _) = gs
get_master :: ParConf -> ProcessId
get_master (_, master, _, _, _, _) = master
get_workers :: ParConf -> [ProcessId]
get_workers :: ParConf -> [(ProcessId, Int, Int)]
get_workers (_, _, workers, _, _, _) = workers
get_global_table_size :: ParConf -> Int
......@@ -212,7 +218,9 @@ master_stats elapsedTime workerStats =
tailIdles = map tail_idle_from_stat workerStats
maxTailIdle = foldl max (head tailIdles) (tail tailIdles)
-- Worker stuff
-- orbit-int worker (computing vertices and holding part of hash table)
defaultCt :: Ct
defaultCt = Ct { verts_recvd = 0
......@@ -224,6 +232,174 @@ defaultCt = Ct { verts_recvd = 0
, max_idle = -1
-- initialise worker
init :: Int -> Int -> Bool -> Process ()
init localTableSize idleTimeout spawnImgComp = do
let table = new localTableSize
receiveWait [
match $ \("init", staticMachConf0) -> do
let statData = defaultCt
staticMachConf1 = set_idle_timeout staticMachConf0 idleTimeout
staticMachConf = if spawnImgComp then staticMachConf1
else clear_spawn_img_comp staticMachConf1
credit = zero
vertex_server staticMachConf credit table statData
-- main worker loop: server handling vertex messages;
-- StaticMachConf: info about machine configuration
-- Credit: credit currently held by the server,
-- Table: hash table holding vertices
-- StatData: various counters and timers for gathering statistics
vertex_server :: ParConf -> Credit -> VTable -> Ct -> Process ()
vertex_server staticMachConf credit table statData = do
let idleTimeout = get_idle_timeout staticMachConf
r <- receiveTimeout idleTimeout [
match $ \("vertex", x, slot, k) -> do
let creditPlusK = credit_atomic k credit
nowTime = now
vertsRecvd = verts_recvd statData
minAtomicCredit = min_atomic_credit statData
lastEvent = last_event statData
initIdle = init_idle statData
maxIdle = max_idle statData
(newCredit, newTable) <-
handle_vertex staticMachConf x slot creditPlusK table
let newStatData0 = statData {verts_recvd = vertsRecvd + 1,
min_atomic_credit = max minAtomicCredit k}
newStatData1 =
if initIdle < 0
then newStatData0 {init_idle = nowTime - lastEvent}
else newStatData0 {max_idle = max maxIdle (nowTime - lastEvent)}
newStatData = newStatData1 {last_event = now}
vertex_server staticMachConf newCredit newTable newStatData
, match $ \"dump" -> do
let nowTime = now
lastEvent = last_event statData
newStatData = statData {tail_idle = nowTime - lastEvent,
last_event = now}
dump_table staticMachConf table newStatData
case r of
Nothing -> do let creditRetd = credit_retd statData
newCreditRetd <- return_credit staticMachConf credit creditRetd
let newStatData = statData {credit_retd = newCreditRetd}
vertex_server staticMachConf zero table newStatData
Just _ -> return ()
-- handle_vertex checks whether vertex X is stored in Slot of Table;
-- if not, it is in inserted there and the images of the generators
-- are distributed among the workers.
-- Precondition: Credit is non-zero.
handle_vertex :: ParConf -> Vertex -> Int -> Credit -> VTable
-> Process (Credit, VTable)
handle_vertex staticMachConf x slot credit table
-- check whether x is already in table
| is_member x slot table = return (credit, table) -- x already in table;
-- do nothing
| otherwise = do -- x not in table
let newTable = insert x slot table -- insert x at slot
-- distribute images of x under generators to their respective workers
newCredit <- distribute_images staticMachConf x credit
-- return remaining credit and updated table
return (newCredit, newTable)
-- return_credit sends non-zero Credit back to the master;
-- returns number of times credit has been returned so far
return_credit :: ParConf -> Credit -> Int -> Process Int
return_credit staticMachConf credit creditRetd
| is_zero credit = return creditRetd
| otherwise = do
let masterPid = get_master staticMachConf
send masterPid ("done", credit)
return (creditRetd + 1)
-- dump_table sends a list containing the local partial orbit to the master,
-- together with some statistics on the distribution of vertices in the table.
dump_table :: ParConf -> VTable -> Ct -> Process ()
dump_table staticMachConf table statData = do
nodeId <- getSelfNode
let masterPid = get_master staticMachConf
stat = worker_stats nodeId (get_freq table) statData
send masterPid ("result", to_list table, stat)
-- distribute_images distributes the images of vertex X under the generators
-- to the workers determined by the hash; some ore all of of the Credit is
-- used to send the messages, the remaining credit is returned;
-- computation and sending of vertices is actually done asynchronously.
-- Precondition: Credit is non-zero.
distribute_images :: ParConf -> Vertex -> Credit -> Process Credit
distribute_images staticMachConf x credit =
do_distribute_images staticMachConf x credit (get_gens staticMachConf)
do_distribute_images :: ParConf -> Vertex -> Credit -> [Sq.Generator]
-> Process Credit
do_distribute_images _ _ credit [] = return credit
do_distribute_images staticMachConf x credit [g] = do
let (k, remainingCredit) = debit_atomic credit
if get_spawn_img_comp staticMachConf
then spawnLocal (send_image staticMachConf x g k) >> return ()
else send_image staticMachConf x g k
return remainingCredit
do_distribute_images staticMachConf x credit (g : gs) = do
let (k, nonZeroRemainingCredit) = debit_atomic_nz credit
if get_spawn_img_comp staticMachConf
then spawnLocal (send_image staticMachConf x g k) >> return ()
else send_image staticMachConf x g k
return nonZeroRemainingCredit
-- distribute_vertices distributes the list of vertices Xs to the workers
-- determined by the hash; some ore all of of the Credit is used to send
-- the messages, the remaining credit is returned.
-- Precondition: If Xs is non-empty then Credit must be non-zero.
distribute_vertices :: ParConf -> Credit -> Credit -> Process Credit
distribute_vertices _ credit [] = return credit
distribute_vertices staticMachConf credit [x] = do
let (k, remainingCredit) = debit_atomic credit
send_vertex staticMachConf x k
return remainingCredit
distribute_vetices staticMachConf credit (x : xs) = do
let (k, nonZeroRemainingCredit) = debit_atomic_nz credit
send_vertex staticMachConf x k
distribute_vertices staticMachConf nonZeroRemainingCredit xs
-- send_image sends image of X under G to the worker determined by
-- the hash of G(X); the message is tagged with atomic credit K.
send_image :: ParConf -> Vertex -> Sq.Generator -> ACredit -> Process ()
send_image staticMachConf x g k = send_vertex staticMachConf (g x) k
-- send_vertex hashes vertex X and sends it to the worker determined by
-- the hash; the message is tagged with atomic credit K.
send_vertex :: ParConf -> Vertex -> ACredit -> Process ()
send_vertex staticMachConf x k = send pid ("vertex", x, slot, k)
where (pid, slot) = hash_vertex staticMachConf x
-- hash_vertex computes the two-dimensional hash table slot of vertex X where
-- the first dim is a worker pid and the second a slot in that worker's table.
hash_vertex :: ParConf -> Vertex -> (ProcessId, Int)
hash_vertex staticMachConf x = global_to_local_slot workers globalSlot
where -- get static info
globalTableSize = get_global_table_size staticMachConf
workers = get_workers staticMachConf
-- compute raw hash and slot in global table
globalSlot = (hash x) `rem` globalTableSize
-- global_to_local_slot traverses the list Workers sequentially to translate
-- slot GlobSlot in the global hash table into a two-dimensional local slot
-- {pid, slot}, where 'pid' is the PID of a worker and 'slot' a the slot
-- in that worker's local hash table.
-- Precondition: GlobSlot < sum of TableSize in Workers.
-- Note: This procedure is horribly inefficient (linear in size of Workers);
-- it should be log (size of Workers) at most.
global_to_local_slot :: [(ProcessId, Int, Int)] -> Int -> (ProcessId, Int)
global_to_local_slot ((pid, _, tabSize) : workers) globSlot
| globSlot < tabSize = (pid, globSlot)
| otherwise = global_to_local_slot workers (globSlot - tabSize)
-- auxiliary functions
-- produce readable statistics
worker_stats :: NodeId -> Freq -> Ct -> WorkerStats
worker_stats node frequency statData =
