Monday, November 30, 2015

Parallel Job Scheduling using Topological Sort - Java

The Problem - Parallel job scheduling - Given a set of jobs with durations and precedence
constraints, schedule the jobs (by finding a start time for each) so as to
achieve the minimum completion time, while respecting the constraints.

JobID Duration (current job must complete before these jobIDs)
0     41.0     1 7 9
1     51.0     2
2     50.0
3     36.0
4     38.0
5     45.0
6     21.0     3 8
7     32.0     3 8
8     32.0     2
9     29.0     4 6

We construct a directed graph for this problem (it needs to be acyclic otherwise the constrians would be impossible to satisfy. If job1 requires job2 to be done, job2 must not require job1 to be done before it). We use topological sort but reverse the edge weights so that the spanning tree gives the length of the longest path from source to any other vertex (maximum work is ensured to be done before another job is taken).

Each node represents a job. There are two additional source and sink nodes. The source has edges to all other job nodes with 0 weights. All job nodes have edges terminating on sink nodes with weights that are required to complete that particular job. Topological sort results are stored in edgeTo[] parent link array to reconstruct the spanning tree. The results are then extracted.

Code:

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Stack;
 
//Directed + Weighted 
class DWEdge implements Comparable<DWEdge>{
    int from, to;
    double weight;
    
    DWEdge(int from, int to, double weight){
        this.from = from;
        this.to = to;
        this.weight = weight;
    }

    @Override
    public int compareTo(DWEdge o) {
        if(this.weight < o.weight)return 1;
        else if(this.weight > o.weight)return -1;
        else return 0;
    }
}

class TopologicalSorter {
    
    private int E, V;
    private ArrayList<ArrayList<DWEdge>> al;
    private boolean marked[];
    private Stack<Integer> stack;
    
    TopologicalSorter(int V){
        this.V = V; 
        marked = new boolean[V];
        stack = new Stack<>(); 
    }
    
    
    public void dfs(int vertex){  
        if(!marked[vertex]){
            marked[vertex] = true;
            for(DWEdge e:al.get(vertex)){
                int v = e.to;
                dfs(v);
            } 
            stack.push(vertex);
        }
    }
    
    
    public static Stack<Integer> sort(ArrayList<ArrayList<DWEdge>> graph, int sourceID){
        TopologicalSorter ts = new TopologicalSorter(graph.size()); 
        ts.al = graph; 
        ts.dfs(sourceID);
        return ts.stack;
    } 
}

class SSSPTopologicalOrder {
    
    static class Task{
        int id;
        double duration;
        ArrayList<Integer> completeBefore;
        
        Task(int id, double duration, ArrayList<Integer> a){
            this.id = id;
            this.duration = duration;
            completeBefore = new ArrayList<>();
            for(int i:a)completeBefore.add(i);
        }
    }
    
    private ArrayList<ArrayList<DWEdge>> graph;
    private int edgeTo[], edgeToR[];
    private double distTo[];
    private int V;
    private int sourceID, sinkID;
    private ArrayList<ArrayList<Integer>> paths;
    
    SSSPTopologicalOrder(int vertices){
        V = vertices+2;
        graph = new ArrayList<>();
        graph.ensureCapacity(V);
        
        sourceID = V-2;
        sinkID = V-1;
        for(int i=0;i<V;++i)graph.add(new ArrayList<>());
        edgeTo = new int[V];  
        distTo = new double[V];
    }
     
    public void constructGraph(Task[] tasks){ 
        for(Task t:tasks){ 
            graph.get(sourceID).add(new DWEdge(sourceID, t.id, 0));
            graph.get(t.id).add(new DWEdge(t.id, sinkID, -t.duration));
            
            for(int i:t.completeBefore){ 
                graph.get(t.id).add(new DWEdge(t.id, i, -t.duration));
            }
        }
    }
    
    public void relax(DWEdge e){ 
        if(distTo[e.to] > distTo[e.from] + e.weight){ 
            distTo[e.to] = distTo[e.from] + e.weight; 
            edgeTo[e.to] = e.from;  
        }
    }
    
    public void findShortestPaths(){  
        
        Arrays.fill(distTo, Double.POSITIVE_INFINITY);
        distTo[sourceID] = 0; 
        
        Stack<Integer> sortedVertices = TopologicalSorter.sort(graph, sourceID); 
        
        while(!sortedVertices.isEmpty()){ 
            int v = sortedVertices.pop();  
            for(DWEdge e : graph.get(v))  relax(e);       
        }
    } 
    
    
    public static void main(String[] args)throws Exception{
        
        //INPUT BEGIN
        BufferedReader br = new BufferedReader(new FileReader("d:\\input2.txt"));
        int n = Integer.parseInt(br.readLine());
          
        Task[] tasks = new Task[n];
        int count = 0;
        String s ="";
        
        while((s=br.readLine())!=null){
            String d[] = s.split(" ");
            double[] data = new double[d.length];
            for(int i=0;i<d.length;++i){
                data[i] = Double.parseDouble(d[i]);
            }
            
            int id = (int)data[0];
            double duration = data[1];
            ArrayList<Integer> al = new ArrayList<>(); 
            for(int i=2;i<d.length;++i)al.add((int)data[i]); 
            tasks[count++] = new Task(id, duration, al);
        }  
        //INPUT PROCESSING ENDS
        
        SSSPTopologicalOrder d = new SSSPTopologicalOrder(n);
        
        /*
            Construct Graph for Input
            Each job has an edge to source vertex and sink vertex both with 0 weigths.
        */
        d.constructGraph(tasks); 
        
        /*
            Call the routine
        */
        d.findShortestPaths();
         
        /*
            Display the results
        */
        System.out.println("edgeTo contents -> "+Arrays.toString(d.edgeTo));
        System.out.println("Results: ");
        
        ArrayList<DWEdge> timings = new ArrayList<>();
        
        for(int i=0;i<d.V-2;++i){
            timings.add(new DWEdge(d.edgeTo[i], i, d.distTo[i])); 
        }
         
        Collections.sort(timings);
        for(DWEdge e:timings){
            System.out.println("At time ("+(-e.weight)+") transition from "+(e.from==d.sourceID?"Start ":e.from)+" to "+e.to+" ");
        }
    }
}

Output:

edgeTo contents -> [10, 0, 8, 6, 9, 10, 9, 0, 6, 0, 0, 2]
Results:
At time (-0.0) transition from Start  to 0
At time (-0.0) transition from Start  to 5
At time (41.0) transition from 0 to 1
At time (41.0) transition from 0 to 7
At time (41.0) transition from 0 to 9
At time (70.0) transition from 9 to 4
At time (70.0) transition from 9 to 6
At time (91.0) transition from 6 to 3
At time (91.0) transition from 6 to 8
At time (123.0) transition from 8 to 2

Input (from source file: )
10
0 41.0 1 7 9
1 51.0 2
2 50.0
3 36.0
4 38.0
5 45.0
6 21.0 3 8
7 32.0 3 8
8 32.0 2
9 29.0 4 6

Solution Image (Algorithms Part II : coursera.com):


No comments:

Post a Comment