Explain about Big Data?
Streaming / Log Data.
Generally, most of the data that is to be analyzed will be produced by various data sources like applications servers, social networking sites, cloud servers, and enterprise servers. This data will be in the form of log files and events.
Log file
Explain log file
Generally we transfer data into the HDFS using put command.
Problem with put command
put can transfer one file at a time
Data Generators.
Centralized store.
What is Flume?
Apache Flume is a tool/service/data ingestion mechanism for collecting ,aggregating and transporting large amounts of streaming data such as log files, events (etc...) from various sources to a centralized data store
Flume is a highly reliable, distributed, and configurable tool. It is designed to copy streaming data (log data) from various web servers to HDFS.
Features of flume
-----------------
Flume is used to transfer log data from multiple web servers into a centralized store (HDFS, HBase) efficiently.
Using Flume, we can get the data from multiple servers immediately into Hadoop.
Along with the log files, Flume is also used to import huge volumes of event data produced by social networking sites like Facebook and Twitter, and e-commerce websites like Amazon and Flipkart.
Flume supports a large set of sources and destinations types.
Flume supports multi-hop flows, fan-in fan-out flows, contextual routing, etc.
Flume can be scaled horizontally.
Flume Architecture
------------------
Data Generators----->Agent1 ---------
| |
v V
----->Agent2 -------->DataCollector---->HDFS
| ^
V |
----->Agent3 ---------
Data Generators generate data which are fb,twitter etc
Agents collects data from data generators.
Datacollector collects data from agents,aggregates data and pushes the data into hdfs
Components of Flume
-------------------
------------
Flume Event|
------------
It is an unit of data which is transported from source to destination.
It contains a data in the form of byte array.
Sturtcure of Flume Event
--------------------
|Headers|Byte Array|
--------------------
Flume Agent
-----------
It is an independent daemon process whihc accepts data from other agents or client and forward it to next destination i,e to a sink/agent.
----------------
source--->() Channel ()---->sink---->HDFS
----------------
A flume Agent has the follwoing core components
i)Source
ii)Channel
iii)Sink
Source
------
It receives data from Data generators and trasnfers it to one or more channels in the form of flume events.
We have many sources in flume,and every source receives data from different data generators.
ex:Avro source,Thrift source,twitter 1% source etc
Channel
-------
It acts as a buffer between source and sinks.The data is stored in channel untill it is consumed by sinks.
A channel is fully transactional and can work with any no of sources and sinks.
ex:Jdbc channel,File System Channel,Memory Channel.
sink
----
A sink is used to store data into central storage like hdfs/hbase.It cosumes data from channels and deliver it to a hdfs/hbase or another agent.
ex:
Hdfs sink
A flume agent can have multiple sources,channels and sinks.
Additional components of a flume agent
--------------------------------------
Interceptors
Channel selectors
Sinks processor
creating flume configuration file
---------------------------------
we use a configuration file to define flume properties in the form of key and values pairs
Name the components of the current agent.
In a configuration we sgould specify the following properties
Describe/Configure the source.
Describe/Configure the sink.
Describe/Configure the channel.
Bind the source and the sink to the channel.
Usually we can have multiple agents in Flume.
We can identityfy each agent by using a unique name.
The name of the agent is used to configure an agent in flume.
Naming the Components
gent_name.sources = source_name
agent_name.sinks = sink_name
agent_name.channels = channel_name
ex:
a1.sources = src1
a1.sinks = s1
a1.channels = c1
Describing the Source
agent_name.sources. source_name.type = value
agent_name.sources. source_name.property2 = value
agent_name.sources. source_name.property3 = value
ex:
# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1
Describing the sink
agent_name.sinks. sink_name.type = value
agent_name.sinks. sink_name.property2 = value
agent_name.sinks. sink_name.property3 = value
ex:
# Describe sink
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/myFlume
Describing channel
------------------
agent_name.channels.channel_name.type =value
agent_name.channels.channel_name.property1 = value
agent_name.channels.channel_name.property2 = value
...
ex:
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100
Bind source and sink with channel
---------------------------------
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.source_name.channel = channelname
$gedit agent1.conf
agent1.conf
_______________________
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1
# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1
# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/myFlume
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1
############################################
submit above flow using following command
[cloudera@quickstart ~]$ hadoop fs -mkdir myFlume
step4:run flume job
[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file agent1.conf --name a1 -Dflume.root.logger=INFO,console
flume-ng---->specifies agent
conf-->specifies configuration of flume
conf-file--Name of flume config file.
name-->Name of the agent
IN hdfs,
[cloudera@quickstart ~]$ hadoop fs -ls myFlume
note: By default sink will write in sequence format
$hadoop fs -cat /user/cloudera/myFlume/FlumeData.123456789
create table tab1(line string)
stored as sequencefile;
load data inpath '/user/cloudera/myFlume/FlumeData.123456789' into table tab1;
select *from tab1;
The Risk in above flow is, if channel is failed or channel system isdown, data will be missed. to provide fault tolerence for channel use following flow.
agent2.conf
___________________
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = src1
a1.sinks = s1 s2
a1.channels = c1 c2
# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1
# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/urFlume
a1.sinks.s2.type = hdfs
a1.sinks.s2.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/urFlume
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1200
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.src1.channels = c1 c2
a1.sinks.s1.channel = c1
a1.sinks.s2.channel = c2
[cloudera@quickstart ~]$ hadoop fs -mkdir urFlume
[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file agent2.conf --name a1 -Dflume.root.logger=INFO,console
[cloudera@quickstart ~]$ hadoop fs -ls urFlume
From above flow, src1 is writing into c1 and c2 channels,
If one channel fails, still data available in another.
but if no failure happend data will be duplicated in target hadoop directory.so before processing data, we need eliminated duplicate records.
Task --> importing from Hive Table
conf/agent3.conf
______________________________
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1
# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=hive -e 'select * from mraw'
# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/ourFlume
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1
#############################################
[cloudera@quickstart ~]$ hive
hive> create table mraw(line string);
OK
Time taken: 2.218 seconds
hive> load data local inpath 'f1' into table mraw;
hive> select * from mraw limit 5;
OK
aaaaaaaaaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
cccccccccccccccccccccccccccccc
ddddddddddddddddddddddddddd
Time taken: 0.341 seconds, Fetched: 5 row(s)
hive>
[cloudera@quickstart ~]$ hadoop fs -mkdir ourFlume
[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent3.conf --name a1 -Dflume.root.logger=INFO,console
[cloudera@quickstart ~]$ hadoop fs -ls ourFlume
in all above cases , output will be in sequence file format.
conf/agent4.conf
________________________________
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1
# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1
# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/naFlume
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1
######################################
above flow will write output in Text Format.
[cloudera@quickstart ~]$ hadoop fs -mkdir naFlume
[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent4.conf --name a1 -Dflume.root.logger=INFO,console
[cloudera@quickstart ~]$ hadoop fs -ls naFlume
[cloudera@quickstart ~]$ hadoop fs -cat naFlume/FlumeData.1471351131067
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
cccccccccccccccccccccccccccccc
ddddddddddddddddddddddddddd
eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee
ddddddddddddddddddddddddddddd
ccccccccccccccccccccccccccccccc
cccccccccccccccccccccccccccccc
ccccccccccccccccccccccccccccccc
above output is in Text Format.
++ ++ ++ +D + +++++++++ + +`1 Q`1 QD ` + ++ d+++++++++++++++++++++++++++++ +++++++++
Streaming / Log Data.
Generally, most of the data that is to be analyzed will be produced by various data sources like applications servers, social networking sites, cloud servers, and enterprise servers. This data will be in the form of log files and events.
Log file
Explain log file
Generally we transfer data into the HDFS using put command.
Problem with put command
put can transfer one file at a time
Data Generators.
Centralized store.
What is Flume?
Apache Flume is a tool/service/data ingestion mechanism for collecting ,aggregating and transporting large amounts of streaming data such as log files, events (etc...) from various sources to a centralized data store
Flume is a highly reliable, distributed, and configurable tool. It is designed to copy streaming data (log data) from various web servers to HDFS.
Features of flume
-----------------
Flume is used to transfer log data from multiple web servers into a centralized store (HDFS, HBase) efficiently.
Using Flume, we can get the data from multiple servers immediately into Hadoop.
Along with the log files, Flume is also used to import huge volumes of event data produced by social networking sites like Facebook and Twitter, and e-commerce websites like Amazon and Flipkart.
Flume supports a large set of sources and destinations types.
Flume supports multi-hop flows, fan-in fan-out flows, contextual routing, etc.
Flume can be scaled horizontally.
Flume Architecture
------------------
Data Generators----->Agent1 ---------
| |
v V
----->Agent2 -------->DataCollector---->HDFS
| ^
V |
----->Agent3 ---------
Data Generators generate data which are fb,twitter etc
Agents collects data from data generators.
Datacollector collects data from agents,aggregates data and pushes the data into hdfs
Components of Flume
-------------------
------------
Flume Event|
------------
It is an unit of data which is transported from source to destination.
It contains a data in the form of byte array.
Sturtcure of Flume Event
--------------------
|Headers|Byte Array|
--------------------
Flume Agent
-----------
It is an independent daemon process whihc accepts data from other agents or client and forward it to next destination i,e to a sink/agent.
----------------
source--->() Channel ()---->sink---->HDFS
----------------
A flume Agent has the follwoing core components
i)Source
ii)Channel
iii)Sink
Source
------
It receives data from Data generators and trasnfers it to one or more channels in the form of flume events.
We have many sources in flume,and every source receives data from different data generators.
ex:Avro source,Thrift source,twitter 1% source etc
Channel
-------
It acts as a buffer between source and sinks.The data is stored in channel untill it is consumed by sinks.
A channel is fully transactional and can work with any no of sources and sinks.
ex:Jdbc channel,File System Channel,Memory Channel.
sink
----
A sink is used to store data into central storage like hdfs/hbase.It cosumes data from channels and deliver it to a hdfs/hbase or another agent.
ex:
Hdfs sink
A flume agent can have multiple sources,channels and sinks.
Additional components of a flume agent
--------------------------------------
Interceptors
Channel selectors
Sinks processor
creating flume configuration file
---------------------------------
we use a configuration file to define flume properties in the form of key and values pairs
Name the components of the current agent.
In a configuration we sgould specify the following properties
Describe/Configure the source.
Describe/Configure the sink.
Describe/Configure the channel.
Bind the source and the sink to the channel.
Usually we can have multiple agents in Flume.
We can identityfy each agent by using a unique name.
The name of the agent is used to configure an agent in flume.
Naming the Components
gent_name.sources = source_name
agent_name.sinks = sink_name
agent_name.channels = channel_name
ex:
a1.sources = src1
a1.sinks = s1
a1.channels = c1
Describing the Source
agent_name.sources. source_name.type = value
agent_name.sources. source_name.property2 = value
agent_name.sources. source_name.property3 = value
ex:
# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1
Describing the sink
agent_name.sinks. sink_name.type = value
agent_name.sinks. sink_name.property2 = value
agent_name.sinks. sink_name.property3 = value
ex:
# Describe sink
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/myFlume
Describing channel
------------------
agent_name.channels.channel_name.type =value
agent_name.channels.channel_name.property1 = value
agent_name.channels.channel_name.property2 = value
...
ex:
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100
Bind source and sink with channel
---------------------------------
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.source_name.channel = channelname
$gedit agent1.conf
agent1.conf
_______________________
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1
# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1
# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/myFlume
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1
############################################
submit above flow using following command
[cloudera@quickstart ~]$ hadoop fs -mkdir myFlume
step4:run flume job
[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file agent1.conf --name a1 -Dflume.root.logger=INFO,console
flume-ng---->specifies agent
conf-->specifies configuration of flume
conf-file--Name of flume config file.
name-->Name of the agent
IN hdfs,
[cloudera@quickstart ~]$ hadoop fs -ls myFlume
note: By default sink will write in sequence format
$hadoop fs -cat /user/cloudera/myFlume/FlumeData.123456789
create table tab1(line string)
stored as sequencefile;
load data inpath '/user/cloudera/myFlume/FlumeData.123456789' into table tab1;
select *from tab1;
The Risk in above flow is, if channel is failed or channel system isdown, data will be missed. to provide fault tolerence for channel use following flow.
agent2.conf
___________________
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = src1
a1.sinks = s1 s2
a1.channels = c1 c2
# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1
# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/urFlume
a1.sinks.s2.type = hdfs
a1.sinks.s2.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/urFlume
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1200
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.src1.channels = c1 c2
a1.sinks.s1.channel = c1
a1.sinks.s2.channel = c2
[cloudera@quickstart ~]$ hadoop fs -mkdir urFlume
[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file agent2.conf --name a1 -Dflume.root.logger=INFO,console
[cloudera@quickstart ~]$ hadoop fs -ls urFlume
From above flow, src1 is writing into c1 and c2 channels,
If one channel fails, still data available in another.
but if no failure happend data will be duplicated in target hadoop directory.so before processing data, we need eliminated duplicate records.
Task --> importing from Hive Table
conf/agent3.conf
______________________________
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1
# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=hive -e 'select * from mraw'
# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/ourFlume
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1
#############################################
[cloudera@quickstart ~]$ hive
hive> create table mraw(line string);
OK
Time taken: 2.218 seconds
hive> load data local inpath 'f1' into table mraw;
hive> select * from mraw limit 5;
OK
aaaaaaaaaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
cccccccccccccccccccccccccccccc
ddddddddddddddddddddddddddd
Time taken: 0.341 seconds, Fetched: 5 row(s)
hive>
[cloudera@quickstart ~]$ hadoop fs -mkdir ourFlume
[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent3.conf --name a1 -Dflume.root.logger=INFO,console
[cloudera@quickstart ~]$ hadoop fs -ls ourFlume
in all above cases , output will be in sequence file format.
conf/agent4.conf
________________________________
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1
# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1
# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/naFlume
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1
######################################
above flow will write output in Text Format.
[cloudera@quickstart ~]$ hadoop fs -mkdir naFlume
[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent4.conf --name a1 -Dflume.root.logger=INFO,console
[cloudera@quickstart ~]$ hadoop fs -ls naFlume
[cloudera@quickstart ~]$ hadoop fs -cat naFlume/FlumeData.1471351131067
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
cccccccccccccccccccccccccccccc
ddddddddddddddddddddddddddd
eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee
ddddddddddddddddddddddddddddd
ccccccccccccccccccccccccccccccc
cccccccccccccccccccccccccccccc
ccccccccccccccccccccccccccccccc
above output is in Text Format.
++ ++ ++ +D + +++++++++ + +`1 Q`1 QD ` + ++ d+++++++++++++++++++++++++++++ +++++++++
No comments:
Post a Comment