import cascading.flow.FlowDef;import cascading.flow.hadoop.HadoopFlowConnector;i
ID: 3674116 • Letter: I
Question
import cascading.flow.FlowDef;import cascading.flow.hadoop.HadoopFlowConnector;import cascading.pipe.Pipe;import cascading.property.AppProps;import cascading.scheme.hadoop.TextDelimited;import cascading.tap.Tap;import cascading.tap.hadoop.
public class CascadingCopyPipe { public static void main(String[] args) { String input = args[0]; String output = args[1]; Properties properties = new Properties(); AppProps.setApplicationJarClass(properties, CascadingCopyPipe.class); HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties); Tap inTap = new Hfs(new TextDelimited(true, ","), input); Tap outTap = new Hfs(new TextDelimited(true, ","), output); Pipe copyPipe = new Pipe("Copy Pipeline"); FlowDef flowDef = FlowDef.flowDef() .addSource(copyPipe, inTap) .addTailSink(copyPipe, outTap); flowConnector.connect(flowDef).complete(); }}
What’s the source?
What’s the sink?
How to declare a source and a sink for apipe?
Explanation / Answer
source is the starting point of the flow operation.
sink is the end point of the flow operation.
For this we need to create the source and the sink taps and then connect it to the flow. Hadoop code will be:
//creating sink and source
Tap source = new Hfs( new TextDelimited( true, " " ), docPath );
Tap sink = new Hfs( new TextDelimited( true, " " ), wcPath );
FlowDef flowDef = FlowDef.flowDef()
.setName( "wc" )
.addSource( docPipe, source )
.addTailSink( wcPipe, sink );
return flowDef;
Related Questions
Navigate
Integrity-first tutoring: explanations and feedback only — we do not complete graded work. Learn more.