MapReduce


Table of Contents
1. Introduction
2. General Design Concept
3. The Program: A Detailed Outline
4. Weaknesses
5. References
6. Source Code
7. Sample Output


1. Introduction
The purpose of this document is to outline my design choices when implementing MapReduce using threads and synchronization. This document has two parts first briefly explain the general design of this program and then it will provide a detail explanation of each class that were created and the motivation for the design choices chosen. Weaknesses and References are listed at the end.


2. General Design Concept
The number of Map Threads corresponds to the number of files (foo1.txt…. foon.txt) and each Map Thread is to process its corresponding file. Each file is assumed to contain one word per line. The Map Threads is to read each word and to hash the word to a Reduce Thread in which the word’s inverted index will be computed. This is done by using a bounded buffer. Each Reduce Thread has a corresponding bounded buffer which it reads from. Therefore, when the Map Threads passes a word to a Reduce Thread, it simply adds the word into the Reduce Thread’s bounded buffer. The bounded buffer has a maximum size limit (10) and minimum size limit (0), therefore a Map Thread cannot add a new entry to a buffer that is at its maximum capacity and a Reduce Thread cannot “consume” from a buffer that is empty-- the Map Thread will have to wait until the buffer size decreases and the Reduce Thread have to wait until the buffer is not empty before it can continue. When a Reduce Thread finishes, it prints out the inverted index it has created.

This is a “Producer, Consumer” problem where the Map Thread is the Producer and the Reduce Thread is the Consumer. The Map Threads produces information and Reduce Threads consume them.


3. The Program: A Detailed Outline
Extras:
TreeMap map- Stores the global inverted index. The TreeMap contains Strings (words) as keys and an ArrayList of fileLine objects (which will be later discussed in this report) as values.

Main Method:
This program spawns multiple Map Threads and Reduce Threads by taking two numeric inputs from the keyboard. The main method of the MapReduce program takes two numeric inputs from the keyboard using a BufferedReader. It is assumed that the user will enter two numbers (not chars, Strings, doubles... etc.) and that the user will enter a number per line.

Design Alternatives:
Scanner: Instead of a BufferedReader I could use a Scanner object. However, since I will also be reading a file using a BufferedReader, I decided that rather than importing another class to read input from the keyboard (Scanner), I would simply use the same class (BufferedReader).

JOptionPane: Similarly, if I used JOptionPane, I would have to import another class. To keep the program as lightweight as possible, I decided to use BufferedReader.

The first number entered corresponds to the number of Map Threads and the number of Map Threads corresponds to the number of files (foo1.txt…. foon.txt). Each Map Thread is to process its corresponding file and it is assumed that the files to be read are named foo1.txt, foo2.txt… etc. and that each file contains one word per line.
The second number entered corresponds to the number of Reduce Threads. Since each Reduce Thread reads from a unique buffer, the second number entered also corresponds to the number of buffers that will be created.

The main method then initialized three arrays, an array of Map Threads, Reduce Threads, and Buffers. It also creates a SharedCounter that is initialized to the number of Map Threads created. I will discuss the SharedCounter class later in the report.

The program first creates all the Buffer objects and stores it in the Buffer array. The program then created all the Map Threads, passing to each of the Map Threads the array of Buffers, the SharedCounter, and a file to be read, adds them to the Map Thread array, and starts them. Next, the program spawns all the Reduce Threads, adds them to the Reduce Thread array, passes each Reduce Thread a unique buffer, and the Reduce Thread id number (index in the Reduce Thread array) and starts them.

fileLine Class:
A fileLine object stores a filename and a line number.

Word Class:
A Word object contains a String and a fileLine object. The String is the word that is read from a file and the fileLine object stores the location in which the word can be found (the file name and the line number).

Buffer Class:
The Buffer class is the bounded buffer. It contains a queue of Word objects in which the Reduce Thread is to consume (pop) from and the MapThread is to add (push) into. It also contains an integer that states the maximum size the queue can be. The class contains two methods: getData() and setData().

getData() Method:
getData() is called by the Reduce Thread to obtain the Word object in which it uses to construct the inverted index. If the queue size is empty, then the thread waits. Otherwise, the thread is notified (woken up) since there is something to be 'consumed.’ An entry is then popped from the queue and is returned to the requesting Reduce Thread.

setData() Method:
setData is used by Map Threads to pass off information to the Reduce Thread to be processed. If the Buffer’s queue is at its maximum size (10), then a MapThread has to wait until the queue size decreases before adding an entry, otherwise a Map Thread can add an entry to the queue. A single waiting thread that is waiting to access the critical area is then notified (woken up).

Synchronization:
Since both Map and Reduce Threads have access to a Buffer’s queue, it is possible that the queue size can be incremented and decremented at the same time. This can cause problems! Since we do not want more than one thread to be able to change the size of the queue at any point in time, the getData() and setData() methods are synchronized. If a thread calls one of the synchronized methods, the thread now owns a “lock” for the Buffer. This ensures that only one thread at a time is running within the critical area. For example, if a Reduce Thread calls getData() to “pop” a Word object from the queue while a Map Thread is adding Word objects to the Buffer’s queue, the Reduce Thread will have to wait until the Map Thread finishes.

SharedCounter Class:
A SharedCounter object contains a single integer. In this program, the only purpose of the SharedCounter class is to keep track of the number of active Map Threads. Once a Map Thread has completed, the Map Thread decrements the SharedCounter. Reduce Threads keeps track of the counter. If the SharedCounter is zero, then the Reduce Threads know that all the Map Threads have finished and therefore it has finish after its Buffer is empty.

Motivation:
The SharedCounter ensures that the Reduce Threads finish correctly. Before adding a SharedCounter, the Reduce Thread would assume it is finish when its buffer is empty-- however, if the Reduce Thread assumes it has finish, and if there still exist some active Map Threads, then the active Map Threads might push Word objects into the Reduce Thread’s buffer! Therefore, the inverted index for those Word object will never be computed. Therefore, for the Reduce Threads to correctly finish, it has to check that there is no more active Map Threads-- this functionality is provided by the Shared Counter class.

MapThread Class:
A Map Thread opens the file that was passed to it and reads in a word line by line (assuming one word per line). For each word, it creates a fileLine object that stores the filename and the line number of the word. Each word is then hashed by calling the hashing method, hash(). The hashing method hashes a word to a number of a specific reduce thread to be processed (For example, if there are 3 Reduce Threads, the hashing function will hash a word to either the number 0, 1, or 2 corresponding to the first, second, and third Reduce Thread respectively). The Map Thread will then add the word to a Buffer object at the Buffer array index of the hashed value, which will be read by that Buffer object’s corresponding Reduce Thread (ex. BufferArray[1] is read by ReduceThreadArray[1]). If the Map Thread has completed reading and hashing all the words, then it has finished. It will then decrement the SharedCounter.

ReduceThread Class:
A Reduce Thread contains a TreeMap, a Buffer, and a SharedCounter. In its run() method, the Reduce Thread attempts to process information from its Bounded Buffer. If its buffer is not empty, then the Reduce Thread will call getData() on its buffer to “pop” off the Word object (which contains a String word and a fileLine object) from the buffer’s queue. If the word observed from the Word object, already exist in the inverted index then the Reduce Thread will access the word in the TreeMap and append a new fileLine object to the word’s corresponding ArrayList. If the word does not already exist in the inverted index, then the Reduce Thread will create a new ArrayList, append the fileLine object from the Word object to the ArrayList, and add (put) the word and the ArrayList to the inverted index (TreeMap). While the Reduce Thread is updating its inverted index, it is also updating the Global Inverted Index, which contains all the computed inverted index to this point.
Once a Reduce Thread finishes-- which it can determine by checking if the SharedCounter is zero and if its buffer is empty, then it prints the inverted index it has created.


4. Weaknesses:
There are a three weaknesses that I could determine in the program.
1. Assuming two numeric inputs on separate lines from the keyboard.
According to the assignment, I am to “assume that a valid integer is provided by the user.” Therefore, the program does not check for invalid inputs such as characters. Also, I am assuming that a user is entering the number of Map Thread and Reduce Threads on separate lines. If the two numbers are entered on the same line, the program will break.
2. Assuming correct file names and correct file numbers
I am assuming in my program that the files names are: foo1.txt, foo2.txt, … foon.txt. If the file is not name like so, then the program will break. Also I am assuming that the number of Mao Threads entered is equal to the number of files available. If the number of Map Thread entered from the keyboard is greater than the number of files, then the program will break.
3. Assuming one word per line
According to the assignment, “assume that each file contains exactly one word per line.” If this assumption does not hold, the program will give an incorrect output.


5. References:
1. Creating Java Threads
http://docs.oracle.com/javase/tutorial/essential/concurrency/runthread.html
2. Java Queue
http://docs.oracle.com/javase/7/docs/api/java/util/Deque.html


6. Source Code:

MapReduce.java

import java.io.*;
import java.util.*;

public class MapReduce{
	/* 'map' is a TreeMap representing the global inverted index. It contains the word as the 
	 * key and a ArrayList of fileLine objects which records the file name and line number
	 * combination the word can be found at. 'map' is filled by the ReduceThread. */
	static TreeMap<String, ArrayList<fileLine>> map = new TreeMap<String,ArrayList<fileLine>>();

	// fileLine is a class that stores a filename and line number.
	class fileLine{
		String file; //filename
		int line; //line number
		
		fileLine(String f, int l){
			file = f;
			line = l;
		}
		
		public String toString(){
			return (file+ " " +line); //returns the filename and line number
		}
	}
	
	/* This class identifies the word and where it can be found (the filename and the line
	 * in the file it can be found). The filename and line number is the fileLine object which
	 * class is constructed above. */
	class Word{
		String word;
		fileLine stats;
		
		Word(String w, fileLine s){
			word = w;
			stats = s;
		}
	}
	
	/* The Buffer class is the bounded buffer. It contains a queue of 'Words' (the above class)
	 * in which the ReduceThread is to read (pop) from and the MapThread is to push into. */
	class Buffer{
		Deque<Word> buff = new ArrayDeque<Word>(); //the queue of 'Words'
		int maxSize=10; //maximum size as stated in the lab requirements
		
		/* getData() is used by the ReduceThread to obtain the 'Word' to construct its inverted
		 * index. If the queue (buff) is empty, then the thread waits. Otherwise, the thread is
		 * notified that there something to be 'consumed' in the queue (woken up). The entry is 
		 * then popped and returned to the requesting thread. */
		synchronized Word getData() throws InterruptedException{
			if (buff.isEmpty()){ //if empty, wait
				System.out.println("*Buffer if empty! Waiting...");
				wait();
				return null;
			}
			
			System.out.println("Request: Buffer Getting Object....  Word: '"+buff.peekFirst().word +"' ; Buffer Size: "+(buff.size()-1) );
			notify();
			return buff.pop(); 
		}
		
		/* setData is used by the MapThread to pass off information to the ReduceThread. 
		 * If the queue (buff) is at its max size (10), then the MapThread has to wait until
		 * the queue size decreases before adding an entry. If the size of the queue is not at
		 * at its max size, or if the size has decreased, then the MapThread can add an entry
		 * to the queue. A waiting thread is notified (woken up) since there is now something
		 * to "consume". */
		synchronized boolean setData(String w, fileLine s) throws InterruptedException{
			if(buff.size()>=maxSize){
				this.wait();
				System.out.println("*Buffer is full! Waiting...");
			}
			System.out.println("Request: Buffer Adding '"+w+"' ; Buffer Size "+(buff.size()+1));
			buff.push(new Word(w,s));
			notify();
			return true;
		}
	
	}
	
	/* The shared counter-- only one thread can have access to the counter at any time. 
	 * Therefore two threads cannot increment or decrement the counter at the same time */
	class SharedCounter{
		private int counter;
		
		SharedCounter(int c){
			counter = c;
		}
		
		synchronized void dec(){
			counter--;
		}	
		synchronized int getCounter(){
			return counter;
		}
	}
	
	/* Reads words from a file, hashes it, and passes it off to the corresponding thread */
	class MapThread extends Thread{
		Buffer[] buff; //array of all the buffers for each ReduceThread
		String filename; //file in which the MapThread reads
		SharedCounter count; //shared data
		BufferedReader br; //use to read files
		
		MapThread(Buffer[] b, SharedCounter c, String f) throws Exception{
			buff = b;
			filename = f;
			count = c;
			
			br = new BufferedReader(new FileReader(filename)); //allow the file to be read
		}
		
		//the hashing function that hashes words to specific reduce threads
		int hash(String line){
			int hash=0;
			for (int i=0; i<line.length(); i++){
				hash = (2*hash + line.charAt(i)); 
			}
			//makes sure that the hash is not greater than the number of reduce threads
			return hash%buff.length; 
		}
		
		public void run(){
			int lineNum=0;
			String line;
			try {
				line = br.readLine(); //reads a word per line
		
				while (line!=null){ //if it is not the end of the file...
					lineNum++;
					//creates a fileLine object that stores the filename and the line number
					fileLine stats = new fileLine(filename,lineNum);
	
					int threadNum = hash(line.toLowerCase()); //hashes the word
					System.out.println("MT - Hashing '"+line.toLowerCase()+"' to RT # "+threadNum);
					/* adds the word and filename,line number combo to the buffer corresponding
					 * to the hash number */
					buff[threadNum].setData(line.toLowerCase(), stats);
					line = br.readLine(); //reads the word on the next line
				}
				br.close(); //close the buffer reader.
				count.dec(); //decrement the sharedcounter
			} catch (IOException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	// The ReduceThread consumes the next word in its buffer and constructes the inverted index
	class ReduceThread extends Thread{
		//Current thread's inverted index
		TreeMap<String, ArrayList<fileLine>> invertedIndex = new TreeMap<String,ArrayList<fileLine>>();
		Buffer reduceBuff; //The buffer in which the Reduce Thread consumes from
		SharedCounter count; //shared data (shared with MapThreads)
		int threadID;

		ReduceThread(Buffer b, SharedCounter c, int r){
			reduceBuff = b;
			count = c;
			threadID = r;
		}
		
		public void run(){
			Word current = null;
			//if the shared data is larger than 0 and if the buffer is not empty
			while(count.getCounter()>0 || !reduceBuff.buff.isEmpty()){
				if (!reduceBuff.buff.isEmpty()){ // if the buffer if not empty
					try {
						current = reduceBuff.getData(); //gets a word (consumes)
						System.out.println("RT - Calculating Inverted Index... Here is the word: "+current.word);
						ArrayList<fileLine> start; //declares an ArrayList
						
						/* if the word already exist in 'map' (inverted index), then look up the
						 * word in the inverted index and add the new filename and line number
						 * combination to its already existing fileLine ArrayList */
						if (map.containsKey(current.word)){
							map.get(current.word).add(current.stats);
						}
						//Calculates this thread's invertedIndex
						if (invertedIndex.containsKey(current.word)){
							if(invertedIndex.get(current.word).contains(current.stats)){}
							else
								invertedIndex.get(current.word).add(current.stats);
						}
						/* if the word does not exist, then create a new ArrayList with the word
						 * and the filename and line number combination and then add the word 
						 * and the ArrayList of the fileLine object to the TreeMap 'map' */
						else{
							start= new ArrayList<fileLine>();
							start.add(current.stats);
							map.put(current.word, start); //global invertedIndex
							invertedIndex.put(current.word, start); //thread's inverted Index
						}

					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}
			//prints out the inverted index created by this thread
			System.out.println("Reduce Thread #"+ threadID +"'s inverted index: "+invertedIndex);
		}
	}

	public static void main (String[]args) throws Exception{
		MapReduce outer= new MapReduce(); //creates a MapReduce object
		
		//Request inputs from the keyboard: the number of MapThreads, and the number of ReduceTjreads
		System.out.println("Enter on separte lines: \na. The number of Map Threads \nb. The number of Reduce threads");
		BufferedReader inFromUser = new BufferedReader(new InputStreamReader(System.in));
		int mapNum = Integer.parseInt(inFromUser.readLine());//number of Map Threads
		int reduceNum = Integer.parseInt(inFromUser.readLine());//number of Reduce Threads
		inFromUser.close(); //close the BufferReader
	
		MapThread[] mthreadArray = new MapThread[mapNum]; //array of MapThreads
		ReduceThread[] rthreadArray = new ReduceThread[reduceNum]; //array of ReduceThreads
		Buffer[] buff = new Buffer[reduceNum]; //array of Buffers
		SharedCounter counter = outer.new SharedCounter(mapNum); //shared data

		/* spawns all reduceThread Buffers. Each buffer corresponds to a ReduceThread. For
		 * example, buff[0] is the buffer for rthreadArrayy[0] (array of ReduceThreads) */
		for (int b=0; b<reduceNum; b++)
			buff[b] = outer.new Buffer();

		/* Spawns a map thread for each file1, file2.... file_mapNum, dds them to the MapThread
		 * array and start all the threads. */
		for(int m=0; m<mapNum;m++){
			String filename = "foo"+(m+1)+".txt";
			mthreadArray[m]=outer.new MapThread(buff, counter, filename);
			mthreadArray[m].start();
		}

		/* spawns reduce threads 0.... reduceNum, adds them to the ReduceThread array and start
		 * the threads */
		for (int r=0; r<reduceNum; r++){
			rthreadArray[r] = outer.new ReduceThread(buff[r], counter, r);
			rthreadArray[r].start();
		}
		
	}

}


7. Sample Output


Enter on separte lines: 
a. The number of Map Threads 
b. The number of Reduce threads
2
5
MT - Hashing 'hello' to RT # 1
Request: Buffer Adding 'hello' ; Buffer Size 1
MT - Hashing 'world' to RT # 4
Request: Buffer Adding 'world' ; Buffer Size 1
MT - Hashing 'my' to RT # 4
Request: Buffer Adding 'my' ; Buffer Size 2
MT - Hashing 'name' to RT # 2
Request: Buffer Adding 'name' ; Buffer Size 1
MT - Hashing 'is' to RT # 0
Request: Buffer Adding 'is' ; Buffer Size 1
MT - Hashing 'world' to RT # 4
Request: Buffer Adding 'world' ; Buffer Size 3
MT - Hashing 'hello' to RT # 1
Request: Buffer Getting Object....  Word: 'hello' ; Buffer Size: 0
RT - Calculating Inverted Index... Here is the word: hello
Request: Buffer Getting Object....  Word: 'is' ; Buffer Size: 0
RT - Calculating Inverted Index... Here is the word: is
Request: Buffer Adding 'hello' ; Buffer Size 1
Request: Buffer Getting Object....  Word: 'world' ; Buffer Size: 2
RT - Calculating Inverted Index... Here is the word: world
Request: Buffer Getting Object....  Word: 'my' ; Buffer Size: 1
RT - Calculating Inverted Index... Here is the word: my
Request: Buffer Getting Object....  Word: 'world' ; Buffer Size: 0
RT - Calculating Inverted Index... Here is the word: world
Reduce Thread #4's inverted index: {my=[foo2.txt 1], world=[foo2.txt 4, foo1.txt 2]}
Global inverted index: {hello=[foo1.txt 1], is=[foo2.txt 3], my=[foo2.txt 1], world=[foo2.txt 4, foo1.txt 2]}
Request: Buffer Getting Object....  Word: 'name' ; Buffer Size: 0
RT - Calculating Inverted Index... Here is the word: name
Reduce Thread #2's inverted index: {name=[foo2.txt 2]}
Global inverted index: {hello=[foo1.txt 1], is=[foo2.txt 3], my=[foo2.txt 1], name=[foo2.txt 2], world=[foo2.txt 4, foo1.txt 2]}
Request: Buffer Getting Object....  Word: 'hello' ; Buffer Size: 0
RT - Calculating Inverted Index... Here is the word: hello
Reduce Thread #0's inverted index: {is=[foo2.txt 3]}
Reduce Thread #1's inverted index: {hello=[foo1.txt 1, foo2.txt 5]}
Global inverted index: {hello=[foo1.txt 1, foo2.txt 5], is=[foo2.txt 3], my=[foo2.txt 1], name=[foo2.txt 2], world=[foo2.txt 4, foo1.txt 2]}
Global inverted index: {hello=[foo1.txt 1, foo2.txt 5], is=[foo2.txt 3], my=[foo2.txt 1], name=[foo2.txt 2], world=[foo2.txt 4, foo1.txt 2]}
Reduce Thread #3's inverted index: {}
Global inverted index: {hello=[foo1.txt 1, foo2.txt 5], is=[foo2.txt 3], my=[foo2.txt 1], name=[foo2.txt 2], world=[foo2.txt 4, foo1.txt 2]}