Thursday, June 8, 2017

Explain about Big Data?
Streaming / Log Data.

Generally, most of the data that is to be analyzed will be produced by various data sources like applications servers, social networking sites, cloud servers, and enterprise servers. This data will be in the form of log files and events.

Log file
Explain log file

Generally we transfer data into the HDFS using  put command.

Problem with put command
put can transfer one file at a time

Data Generators.
Centralized store.


What is Flume?
Apache Flume is a tool/service/data ingestion mechanism for collecting ,aggregating and transporting large amounts of streaming data such as log files, events (etc...) from various sources to a centralized data store

Flume is a highly reliable, distributed, and configurable tool. It is designed to copy streaming data (log data) from various web servers to HDFS.

Features of flume
-----------------
Flume is used to transfer log data from multiple web servers into a centralized store (HDFS, HBase) efficiently.

Using Flume, we can get the data from multiple servers immediately into Hadoop.

Along with the log files, Flume is also used to import huge volumes of event data produced by social networking sites like Facebook and Twitter, and e-commerce websites like Amazon and Flipkart.

Flume supports a large set of sources and destinations types.

Flume supports multi-hop flows, fan-in fan-out flows, contextual routing, etc.

Flume can be scaled horizontally.

Flume Architecture
------------------
Data Generators----->Agent1 ---------
               |                    |
               v                    V
               ----->Agent2 -------->DataCollector---->HDFS
               |                    ^
               V                    |
               ----->Agent3 ---------


Data Generators generate data which are  fb,twitter etc

Agents collects data from data generators.

Datacollector collects data from agents,aggregates data and pushes the data into hdfs

Components of Flume
-------------------
------------
Flume Event|
------------
It is an unit of data which is transported from source to destination.

It contains a data  in the form of byte array.

Sturtcure of Flume Event
--------------------
|Headers|Byte Array|
--------------------

Flume Agent
-----------
It is an independent daemon process whihc accepts data from other agents or client and forward it to next destination i,e to a sink/agent.

           ----------------
source--->()   Channel     ()---->sink---->HDFS
           ----------------


A flume Agent has the follwoing core components
i)Source
ii)Channel
iii)Sink


Source
------
It receives data from Data generators and trasnfers it to one or more channels in the form of flume events.

We have many sources in flume,and every source receives data from different data generators.

ex:Avro source,Thrift source,twitter 1% source etc


Channel
-------
It acts as a buffer between source and sinks.The data is stored in channel untill it is consumed by sinks.

A channel is fully transactional and can work with any no of sources and sinks.

ex:Jdbc channel,File System Channel,Memory Channel.

sink
----
A sink is used to store data into central storage like hdfs/hbase.It cosumes data from channels and deliver it to a hdfs/hbase or another agent.

ex:
Hdfs sink

A flume agent can have multiple sources,channels and sinks.


Additional components of a flume agent
--------------------------------------
Interceptors

Channel selectors

Sinks processor

creating flume configuration file
---------------------------------
we use a configuration file to define flume properties in the form of key and values pairs

Name the components of the current agent.

In a configuration we sgould specify the following properties
Describe/Configure the source.
Describe/Configure the sink.
Describe/Configure the channel.
Bind the source and the sink to the channel.

Usually we can have multiple agents in Flume.
We can identityfy each agent by using a unique name.
The name of the agent is used to configure an agent in flume.

Naming the Components
gent_name.sources = source_name
agent_name.sinks = sink_name
agent_name.channels = channel_name

ex:
a1.sources = src1
a1.sinks = s1
a1.channels = c1

Describing the Source
agent_name.sources. source_name.type = value
agent_name.sources. source_name.property2 = value
agent_name.sources. source_name.property3 = value

ex:
# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1


Describing the sink

agent_name.sinks. sink_name.type = value
agent_name.sinks. sink_name.property2 = value
agent_name.sinks. sink_name.property3 = value

ex:
# Describe sink
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/myFlume


Describing channel
------------------
agent_name.channels.channel_name.type =value
agent_name.channels.channel_name.property1 = value
agent_name.channels.channel_name.property2 = value
...

ex:
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100

Bind source and sink with channel
---------------------------------
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.source_name.channel = channelname



$gedit agent1.conf

agent1.conf
_______________________

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1

# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1


# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/myFlume

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1

############################################
submit above flow using following command

[cloudera@quickstart ~]$ hadoop fs -mkdir myFlume

step4:run flume job
[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file agent1.conf --name a1 -Dflume.root.logger=INFO,console


flume-ng---->specifies agent
conf-->specifies configuration of flume
conf-file--Name of flume config file.
name-->Name of the agent

IN hdfs,
[cloudera@quickstart ~]$ hadoop fs -ls myFlume
note: By default sink will write in  sequence format

$hadoop fs -cat /user/cloudera/myFlume/FlumeData.123456789


create table tab1(line string)
stored as sequencefile;

load data inpath '/user/cloudera/myFlume/FlumeData.123456789' into table tab1;

select *from tab1;


The Risk in above flow is, if channel is failed or channel system isdown, data will be missed.  to provide fault tolerence for channel use following flow.


agent2.conf
___________________
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = src1
a1.sinks = s1 s2
a1.channels = c1 c2

# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1


# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/urFlume

a1.sinks.s2.type = hdfs
a1.sinks.s2.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/urFlume


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1200
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.src1.channels = c1 c2
a1.sinks.s1.channel = c1
a1.sinks.s2.channel = c2

[cloudera@quickstart ~]$ hadoop fs -mkdir urFlume

[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file agent2.conf --name a1 -Dflume.root.logger=INFO,console

[cloudera@quickstart ~]$ hadoop fs -ls urFlume

From above flow, src1 is writing into c1 and c2 channels,
If one channel fails, still data available in another.
but if no failure happend data will be duplicated in target hadoop directory.so before processing data, we need eliminated duplicate records.


Task --> importing from Hive Table

conf/agent3.conf
______________________________
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1

# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=hive -e 'select * from mraw'


# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/ourFlume

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1

#############################################

[cloudera@quickstart ~]$ hive
hive> create table mraw(line string);
OK
Time taken: 2.218 seconds
hive> load data local inpath 'f1' into table mraw;
hive> select * from mraw limit 5;
OK
aaaaaaaaaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
cccccccccccccccccccccccccccccc
ddddddddddddddddddddddddddd
Time taken: 0.341 seconds, Fetched: 5 row(s)
hive>

[cloudera@quickstart ~]$ hadoop fs -mkdir ourFlume

[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent3.conf --name a1 -Dflume.root.logger=INFO,console

[cloudera@quickstart ~]$ hadoop fs -ls ourFlume

in all above cases , output will be in sequence file format.


conf/agent4.conf
________________________________

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1

# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1


# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/naFlume


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1

######################################

 above flow will write output in Text Format.

[cloudera@quickstart ~]$ hadoop fs -mkdir naFlume

[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent4.conf --name a1 -Dflume.root.logger=INFO,console

[cloudera@quickstart ~]$ hadoop fs -ls naFlume

[cloudera@quickstart ~]$ hadoop fs -cat naFlume/FlumeData.1471351131067
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
cccccccccccccccccccccccccccccc
ddddddddddddddddddddddddddd
eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee
ddddddddddddddddddddddddddddd
ccccccccccccccccccccccccccccccc
cccccccccccccccccccccccccccccc
ccccccccccccccccccccccccccccccc

above output is in Text Format.




















































++ ++ ++ +D + +++++++++ + +`1 Q`1 QD ` + ++ d+++++++++++++++++++++++++++++ +++++++++







































No comments: