From 3f177aa3981417eb684b99ce1a88393ef6b7845d Mon Sep 17 00:00:00 2001 From: Aggelos Giantsios <aggelgian@gmail.com> Date: Sun, 16 Nov 2014 03:17:39 +0200 Subject: [PATCH] Elaborate on Hosts type, add few funs to Master First attempt to use Cloud Haskell's primitives --- Master.hs | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++- Types.hs | 7 ++-- 2 files changed, 117 insertions(+), 5 deletions(-) diff --git a/Master.hs b/Master.hs index 78114af..f431716 100644 --- a/Master.hs +++ b/Master.hs @@ -4,11 +4,18 @@ module Master () where import Control.Distributed.Process - ( ProcessId + ( Process + , ProcessId + , receiveWait + , match ) import qualified Sequential as Sq ( orbit ) +import Credit + ( is_one + , credit + ) import Table ( sum_freqs , freq_from_stat @@ -24,13 +31,117 @@ import WorkerAux ) import Types ( Generator - , Host + , HostInfo(..) , MaybeHosts(..) , ParConf , Stats , Vertex ) +-- DATA +-- Static Machine Configuration: +-- {Gs, %list of generators +-- Master, %pid of master process +-- Workers, %list of Worker +-- GlobalTableSize, %size of global hash table +-- IdleTimeout, %milliseconds this worker idles before sending 'done' +-- SpawnImgComp} %true iff this worker spawns image computations +-- +-- Worker: +-- {Pid, %pid of worker process +-- TableOffset, %offset (= index 0) of local table into global table +-- TableSize} %size of local hash table +-- +-- Host: +-- {Node, %atom naming Erlang node +-- Procs, %number of processors +-- TableSize, %size of hash table per processor +-- IdleTimeout} %milliseconds a processor idles before sending 'done' +-- +-- Statistics: +-- List of pairs where the first component is an atom, the second +-- some data. Part of the data is the fill frequency of the table +-- (a list whose ith element indicates frequency of filling degree i). + + +-- MESSAGES +-- Master -> Worker: {init, StaticMachConf} +-- +-- Master/Worker -> Worker: {vertex, X, Slot, K} +-- %X is vertex +-- %Slot is slot of X on target worker +-- %K is atomic credit shipped with vertex +-- +-- Worker -> Master: {done, Cs} +-- %Cs is non-zero credit (rep as list of ints) +-- +-- Master -> Worker: {dump} +-- +-- Worker -> Master: {result, Xs, Stats} +-- %Xs is list of found orbit vertices +-- %Stats is statistics about worker's table + + +-- compute orbit of elements in list Xs under list of generators Gs; +-- the argument Hosts is either an integer N, a triple {P, N, T}, or +-- a non-empty list [{H, P, N, T} | ...] of quadruples: +-- * N: run the sequential algorithm with table size N +-- * {P, N, T, S}: run the parallel algorithm on P processors +-- each with table size N, idle timeout T and +-- spawn image computation flag S; +-- * [{H, P, N, T, S} | ...]: run the distributed algorithm on the list of +-- hosts, where each quintuple {H, P, N, T, S} +-- specifies +-- * host name H (ie. name of Erlang node), +-- * number of processors P on H, +-- * table size N (per processor), +-- * idle timeout T, and +-- * spawn image computation flag S. +-- The function returns a pair consisting of the computed orbit and +-- a list of statistics, the first element of which reports overall statistics, +-- and all remaining elements report statistics of some worker. +orbit :: [Vertex -> Vertex] -> [Vertex] -> MaybeHosts -> ([Vertex], [Stats]) +orbit gs xs (Seq tablesize) = Sq.orbit gs xs tablesize +orbit gs xs (Par hostInfo) = par_orbit gs xs hostInfo + + +-- FIXME Write the proper par_orbit +par_orbit :: [Vertex -> Vertex] -> [Vertex] -> HostInfo -> ([Vertex], [Stats]) +par_orbit gs xs hosts = ([42], [[("xxx", "xxx")]]) + + + + + + + + +-- collect_credit collects leftover credit from idle workers until +-- the credit adds up to 1. +collect_credit :: [Int] -> Process () +collect_credit crdt = + case is_one crdt of + True -> return () + False -> receiveWait [ + match $ \("done", workersCredit) -> + collect_credit $ credit workersCredit crdt + ] + +-- collect_orbit collects partial orbits and stats from N workers. +collect_orbit :: Int -> Int -> (Process [Vertex], Process [Stats]) +collect_orbit elapsedTime n = (orbit, stats) + where x = do_collect_orbit n [] [] + orbit = x >>= (\(partOrBits, _) -> return $ concat partOrBits) + stats = x >>= (\(_, workerStats) -> return $ (master_stats elapsedTime workerStats) : workerStats) + +do_collect_orbit :: Int -> [[Vertex]] -> [Stats] -> Process ([[Vertex]], [Stats]) +do_collect_orbit 0 partOrbits workerStats = return (partOrbits, workerStats) +do_collect_orbit n partOrbits workerStats = do + receiveWait [ + match $ \("result", partOrbit, workerStat) -> + do_collect_orbit (n - 1) (partOrbit : partOrbits) (workerStat : workerStats) + ] + ------------------------------------------------------------------------------- -- auxiliary functions diff --git a/Types.hs b/Types.hs index 59b4716..7fb9862 100644 --- a/Types.hs +++ b/Types.hs @@ -4,7 +4,7 @@ module Types ( Ct(..) , Generator , Freq - , Host + , HostInfo(..) , MaybeHosts(..) , ParConf , SeqConf @@ -25,9 +25,10 @@ type Vertex = Int type VTable = Array Int [Vertex] type Stats = [(String, String)] type Generator = Vertex -> Vertex -type Host = (NodeId, Int, Int, Int) -- Node, Procs, TableSize, IdleTimeout data MaybeHosts = Seq Int - | Par [Host] + | Par HostInfo +data HostInfo = JustOne (Int, Int, Int, Bool) -- Procs, TableSize, IdleTimeout, SpwnImgComp + | Many [(NodeId, Int, Int, Int, Bool)] -- NodeId, Procs, TableSize, IdleTimeout, SpwnImgComp type SeqConf = ([Generator], Int) type ParConf = ([Generator], ProcessId, [ProcessId], Int, Int, Bool) -- 2.18.1