Skip to content

Implementing MapReduce in Java

Introduction

MapReduce is a distributed computing model designed for processing large datasets. It divides data processing tasks into Map and Reduce phases, achieving computational efficiency through parallel processing. This article will demonstrate how to implement a simple MapReduce framework in Java, similar to the functionality described in Google's MapReduce paper.

System Architecture

Our Java implementation uses a Master-Worker architecture:

The main components include:

  1. Master: Responsible for task assignment and scheduling
  2. Worker: Executes specific Map/Reduce tasks
  3. Task: Represents information about a Map or Reduce task

Core Implementation

KeyValue Class

The KeyValue class represents key-value pairs:

java
public class KeyValue implements Comparable<KeyValue> {
    public String key;
    public String value;
    
    public KeyValue(String key, String value) {
        this.key = key;
        this.value = value;
    }
    
    @Override
    public int compareTo(KeyValue o) {
        return key.compareTo(o.key);
    }
}

Map Process

Map phase workflow:

Map function implementation:

java
public static List<KeyValue> sequentialMap(String filename, String content) {
    Pattern pattern = Pattern.compile("[a-zA-Z]+");
    Matcher matcher = pattern.matcher(content);
    List<String> words = new ArrayList<>();
    while (matcher.find()) {
        words.add(matcher.group());
    }
    List<KeyValue> result = new ArrayList<>();
    for (String word : words) {
        KeyValue kv = new KeyValue(word, "1");
        result.add(kv);
    }
    return result;
}

Reduce Process

Reduce phase workflow:

Reduce function implementation:

java
public static String sequentialReduce(String key, List<String> values) {
    return String.valueOf(values.size());
}

Task Scheduling

The Master is responsible for task assignment and scheduling, with the following core workflow:

Data Flow Between Map and Reduce

The data flow between Map task outputs and Reduce task inputs is a crucial part of the MapReduce framework. In our implementation, this process includes several important steps:

  1. Partition Writing

After processing data, Map tasks write results to temporary files on local disk. To ensure data is correctly distributed to different Reduce tasks, we use a partitioning strategy:

java
// Partition writing logic in MapF.java
for (KeyValue kv : kvl) {
    // Use key's hash value modulo number of Reduce tasks to determine partition
    intermediate.get(hash(kv.key) % task.getnReduce()).add(kv);
}

// Write each partition's data to corresponding temporary file
for (int i = 0; i < task.getnReduce(); i++) {
    String oName = "mr-%d-%d".formatted(task.getTaskID(), i);
    File oFile = new File(oName);
    FileWriter fw = new FileWriter(oFile, true);
    for (KeyValue kv : intermediate.get(i)) {
        fw.append(template.formatted(kv.key, kv.value));
    }
    fw.close();
}

The key points about temporary file naming:

  • mr-X-Y: X is the Map task ID, Y is the target Reduce task ID
  • This naming convention ensures each Reduce task can accurately find its data
  1. File Organization
  1. Reduce Task Data Collection

When a Reduce task starts, it collects all temporary files generated for it by Map tasks:

java
// Data collection logic in ReduceF.java
List<KeyValue> intermediate = new ArrayList<>();
for (int i = 0; i < task.getnMap(); i++) {
    // Construct corresponding temporary filename
    String iName = "mr-%d-%d".formatted(i, task.getTaskID());
    File iFile = new File(iName);
    
    // Read file contents
    try (FileInputStream fs = new FileInputStream(iFile)) {
        byte[] data = new byte[(int) iFile.length()];
        fs.read(data);
        String[] lines = new String(data, StandardCharsets.UTF_8).split("\n");
        for (String line : lines) {
            String[] content = line.split(" ");
            intermediate.add(new KeyValue(content[0], content[1]));
        }
    }
}

// Sort collected data
Collections.sort(intermediate);
  1. Data Merge Processing

After collecting data, the Reduce task merges data with the same key:

java
int i = 0;
while (i < intermediate.size()) {
    int j = i + 1;
    // Find range of data with same key
    while (j < intermediate.size() && 
           intermediate.get(j).key.equals(intermediate.get(i).key)) {
        j++;
    }
    
    // Collect all values for the same key
    List<String> values = new LinkedList<>();
    for (int k = i; k < j; k++) {
        values.add(intermediate.get(k).value);
    }
    
    // Call reduce function for processing
    String output = reduceF.apply(intermediate.get(i).key, values);
    
    // Write final results
    try (FileWriter fw = new FileWriter(oFile, true)) {
        String content = "%s %s\n".formatted(intermediate.get(i).key, output);
        fw.append(content);
    }
    
    i = j;
}

The key points of this process are:

  1. Using hash partitioning to ensure data with the same key goes to the same Reduce task
  2. Using standardized file naming to ensure correct data flow
  3. Sorting data in the Reduce phase for easier grouping
  4. Automatic management of temporary files (creation and cleanup) by the framework

This design ensures data flows correctly from Map tasks to Reduce tasks while supporting fault tolerance: if a task fails, only the corresponding Map or Reduce task needs to be re-executed.

Usage Example

  1. Start the Master server:
java
public static void main(String[] args) {
    Master master = new MasterImpl(readFiles(), 10);
    Naming.bind("rmi://localhost:1099/master", master);
    System.out.println("RMI server started.");
}
  1. Start a Worker:
java
public static void main(String[] args) {
    Worker worker = new Worker();
    worker.work(MapF::sequentialMap, ReduceF::sequentialReduce);
}

Performance Optimization

  1. Task Granularity
  • Set appropriate number of Map and Reduce tasks
  • Avoid too many tasks causing scheduling overhead
  • Avoid too few tasks preventing full parallelism utilization
  1. Data Locality
  • Try to have Workers process local data
  • Reduce network transfer overhead
  1. Fault Tolerance
  • Timeout retry mechanism
  • Failed task reassignment

Summary

This article demonstrated how to implement a simple MapReduce framework in Java. Using a Master-Worker architecture, we implemented distributed computing supporting parallel processing of large datasets. Key points include:

  1. Two-phase Map/Reduce processing
  2. Distributed task scheduling
  3. Fault tolerance mechanism
  4. Parallel computation

While this implementation is simple, it contains the core concepts and basic workflow of MapReduce, helping understand MapReduce principles.

References

  1. Dean, Jeffrey, and Sanjay Ghemawat. "MapReduce: simplified data processing on large clusters." Communications of the ACM 51.1 (2008): 107-113.
  2. Kleppmann, Martin. "Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems." O'Reilly Media, 2017.

License

This article is licensed under CC BY-NC-SA 4.0 . You are free to:

  • Share — copy and redistribute the material in any medium or format
  • Adapt — remix, transform, and build upon the material

Under the following terms:

  • Attribution — You must give appropriate credit, provide a link to the license, and indicate if changes were made. You may do so in any reasonable manner, but not in any way that suggests the licensor endorses you or your use.
  • NonCommercial — You may not use the material for commercial purposes.
  • ShareAlike — If you remix, transform, or build upon the material, you must distribute your contributions under the same license as the original.

Last updated at: