Wednesday, May 17, 2017

Hadoop[Mapreduce tutorial]

Program to create wordcount
---------------------------
Mapper logic
------------
    map(LonWritable k,Text v,Context con)

step1:Convert Text value to string
 
  String str=v.toString();

step2:Divide string into tokens by  using a class StringTokenizer.

A StringTokenizer is a class used to divide a string into tokens based on a delimiter,default delimiter is space.

It has 2 methods

i)public boolean hasMoreTokens()-->
checks whether a string available or not,returns true if string available otherwise false.

ii)public String nextToken()
returns a string and also move cursor to nextToken


Dividing a string into tokens
-----------------------------
StringTokenizer tokens=new StringTokenizer(str);

once we divide string into toeksn iterate all tokens one by one using the above methods.

while(tokesn.hasMoreTokens())
{
    String token=tokens.nextToken();
    con.Write(new Text(token),new IntWritable(1));
}

Here once we get a token write it to Context by using the method write()

Here we know we should initialize each word with 1,
word is key
value is 1

This is nothing but converting Text into key/value pairs by map()


creating a MapReduce Progam
---------------------------
Open eclipse-->File-->new-->JavaProject-->
Name As-->WordCountProj-->finish

Add a new package
---------------
Goto src-->create--> new--> package-->com.wordcount

Add a Mapper class
------------------
Goto-->src-->com.wordcount-->new-->class-->Name as-->MyMapper

package com.wordcount;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable k, Text v, Context con) throws IOException,
InterruptedException {
//
String str = v.toString();

//
StringTokenizer tokens = new StringTokenizer(str);

while (tokens.hasMoreTokens()) {
String word = tokens.nextToken();
con.write(new Text(word), new IntWritable(1));
        }

}
}

creating  a Reducer class
-------------------------
step1:
Extend a Reducer class from a super class Reducer available in org.apche.hadoop.mapreduce.* package.

Reducer is represented in generic form

Reducer<Text,IntWritable,Text,IntWritable>
         k1   v1         k2    v2

step2:override reduce method available in Reducer class

public void reduce(Text k,Iterable<IntWritable> v,Context con)


Text -->is a key
Value-->IntWritable

K1,v1 is the input coming from mapper after converting a string into key/value pairs.

k2,v2 is key/value pair generated by Reducer after optimizing values.

The parameters to reduce method Text,Iterable are inputs coming from Mapper

Context -->is used to write final output of Reducer to a file.

Reducer logic
-------------
package com.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text k, Iterable<IntWritable> list, Context con) throws IOException, InterruptedException {

int sum=0;
           
for(IntWritable val:list){
sum=sum+val.get();
}
con.write(k,new IntWritable(sum));
}
}

creating driver class
---------------------
A driver class is used to provide configuration details of MapReduce application.


we should provide the following configuartion details in a driver class.

i)Input class for jar.
ii)Mapper className
iii)Reducer ClassName
iv)Output key class
v)Input key class
vi)File input path
vii)File out path



To set above config details we need a Job class in MapReduce.
A Job class is created from Configuration class.
A Configuration will read config details and pass it to job.


creating a driver class
-----------------------
step1:
Configuration c = new Configuration();

step2:
Job j = new Job(c, "MyFirst");

setting properties
------------------
j.setJarByClass(MyDriver.class);
j.setMapperClass(MyMapper.class);
j.setReducerClass(MyReducer.class);

j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);


Path p1 = new Path(args[0]); // input
Path p2 = new Path(args[1]); // output

args[0]-->first argument submitted from commanline
args[1]-->second argument submitted from commanline


FileInputFormat.addInputPath(j, p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ? 0 : 1);


org.apache.hadoop.conf.*-->Configuration; org.apache.hadoop.fs.*-->Path;
org.apache.hadoop.mapreduce.*-->Job;
org.apache.hadoop.mapreduce.lib.input.*-->FileInputFormat;
org.apache.hadoop.mapreduce.lib.output.*-->FileOutputFormat;

creating MyDriver.java
----------------------
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import java.io.*;
public class MyDriver
{
public static void main(String args[])
{

Configuration c = new Configuration();
//step2:
Job j = new Job(c, "MyFirst");
//setting properties
j.setJarByClass(MyDriver.class);
j.setMapperClass(MyMapper.class);
j.setReducerClass(MyReducer.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);

Path p1 = new Path(args[0]); // input
Path p2 = new Path(args[1]); // output

FileInputFormat.addInputPath(j, p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ? 0 : 1);
}
}


MapReduce program to process emp dataset
----------------------------------------
hive>select gender,sum(sal) from empdata gorup by gender;

Write an equivalent program in mapreduce to find sum(sal) group by gender.

create Project--->SumProject
Add a package-->SumProject-->src-->com.manohar
Add 3 classes
MyMapper.java
MyReducer.java
MyDriver.java


MyMapper.java
-------------
package com.manohar;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
public class MyMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
public void map(LongWritable k, Text v, Context con) throws IOException,InterruptedException {
String str = v.toString();
String w[] = str.split(",");

String gender = w[3];
double sal = Double.parseDouble(w[2]);
con.write(new Text(gender), new DoubleWritable(sal));
}
}


MyReducer.java
--------------
package com.manohar;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

public class MyReducer extends
Reducer<Text, DoubleWritable, Text, DoubleWritable> {
public void reduce(Text k, Iterable<DoubleWritable> list, Context con)
throws IOException, InterruptedException {

double sum = 0.0;
for (DoubleWritable d : list) {
sum = sum + d.get();
}
con.write(k, new DoubleWritable(sum));

}
}

MyDriver.java
-------------
public class MyDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration c = new Configuration();
// step2:
Job j = new Job(c, "MyFirst");
// setting properties
j.setJarByClass(MyDriver.class);
j.setMapperClass(MyMapper.class);
j.setReducerClass(MyReducer.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(DoubleWritable.class);

Path p1 = new Path(args[0]); // input
Path p2 = new Path(args[1]); // output

FileInputFormat.addInputPath(j, p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ? 0 : 1);

}

}

select dno,sum(sal) from emp
select sex, avg(sal) from emp
   group by sex;
select sex, count(sal) from emp
   group by sex;
select sex, max(sal) from emp
   group by sex;
select dno, sex, sum(sal) from emp
   group by dno, sex;

String line =
v.toString();
      String[] w = line.split(",");  
      String sex = w[3];
      String dno = w[4];
      String myKey = dno+"\t"+sex;
     int sal =Integer.parseInt(w[2]);
    con.write(new Text(myKey),new
IntWritable(sal));










How to run a mapreduce jar
---------------------------
$hadoop jar JarName.jar  \
DriverName \
Inputpath \
outputpath

$cat > myfile
I love teaching
I walk teaching
I love hadoop
hadoop is good
^c

copy to hdfs
-------------
$hadoop fs -put myfile /user/clouera/

example
--------
hadoop jar wordcount.jar \
com.wordcount.MyDriver \
/user/cloudera/myfile \
/user/cloudera/mr/wc


Reducer creates a file under the path
/user/cloudera/mr/wc
part-r-00000

r-->reducer output.




How to disable  a Reducer
------------------------
j.setNumReduceTasks(0);

NullWritable in MapReduce
-------------------------


MyMapper.java
-------------
package com.manohar;

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
public void map(LongWritable k,Text v,Context con) throws IOException, InterruptedException{
            String str=v.toString();
            String w[]=str.split(",");
            String g=w[3];
            String d=w[4];
            double sal=Double.parseDouble(w[2]);
            String dg=w[4]+w[3];
            con.write(new Text(dg),NullWritable.get());

}
}

Merging multiple files
----------------------
MyMapper.java
-------------
public class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
public void map(LongWritable k, Text v, Context con) throws IOException,
InterruptedException {
con.write(v,NullWritable.get());

}
}

MyMapper2.java
--------------
package com.wordcount;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper2 extends Mapper<LongWritable, Text, Text, NullWritable> {
public void map(LongWritable k, Text v, Context con) throws IOException,
InterruptedException {
String str = v.toString();
String w[] = str.split(",");
String line = w[0] + "," + w[1] + "," + w[2] + "," + w[4] + "," + w[3];

con.write(new Text(line), NullWritable.get());

}
}

MyDriver.java
-------------
package com.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyDriver {
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "Merge");
j.setJarByClass(MyDriver.class);
//j.setNumReduceTasks(1);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(NullWritable.class);

Path p1 = new Path(args[0]); // emp
Path p2 = new Path(args[1]); // emp2
Path p3 = new Path(args[2]); // emp3
Path p4 = new Path(args[3]); // output

MultipleInputs.addInputPath(j, p1, TextInputFormat.class,
MyMapper.class);
MultipleInputs.addInputPath(j, p2, TextInputFormat.class,
MyMapper.class);
MultipleInputs.addInputPath(j, p3, TextInputFormat.class,
MyMapper2.class);
FileOutputFormat.setOutputPath(j, p4);
System.exit(j.waitForCompletion(true) ? 0 : 1);
}
}


create a jar file-->merge.jar

cat > emp1
1,aaa,1000,m,11


cat > emp2
2,bbb,2000,f,12

cat > emp3
3,ccc,1000,14,m

Place the 3 files in hdfs
-------------------------
hadoop fs -put emp1 /user/cloduera
hadoop fs -put emp2 /user/cloduera
hadoop fs -put emp3 /user/cloduera

How to run
----------
$hadoop jar  merger.jar \
com.manohar.MyDriver \
emp1 \
emp2 \
emp3 \
/user/cloudera/merge

























































































































 
































 








No comments: