Academic Integrity: tutoring, explanations, and feedback — we don’t complete graded work or submit on a student’s behalf.

import cascading.flow.FlowDef;import cascading.flow.hadoop.HadoopFlowConnector;i

ID: 3674116 • Letter: I

Question

import cascading.flow.FlowDef;import cascading.flow.hadoop.HadoopFlowConnector;import cascading.pipe.Pipe;import cascading.property.AppProps;import cascading.scheme.hadoop.TextDelimited;import cascading.tap.Tap;import cascading.tap.hadoop.

public class CascadingCopyPipe { public static void main(String[] args) { String input = args[0]; String output = args[1]; Properties properties = new Properties(); AppProps.setApplicationJarClass(properties, CascadingCopyPipe.class); HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties); Tap inTap = new Hfs(new TextDelimited(true, ","), input); Tap outTap = new Hfs(new TextDelimited(true, ","), output); Pipe copyPipe = new Pipe("Copy Pipeline"); FlowDef flowDef = FlowDef.flowDef() .addSource(copyPipe, inTap) .addTailSink(copyPipe, outTap); flowConnector.connect(flowDef).complete(); }}

What’s the source?

What’s the sink?

How to declare a source and a sink for apipe?

Explanation / Answer

source is the starting point of the flow operation.

sink is the end point of the flow operation.

For this we need to create the source and the sink taps and then connect it to the flow. Hadoop code will be:

  
   //creating sink and source
   Tap source = new Hfs( new TextDelimited( true, " " ), docPath );
Tap sink = new Hfs( new TextDelimited( true, " " ), wcPath );
  
   FlowDef flowDef = FlowDef.flowDef()
.setName( "wc" )
.addSource( docPipe, source )
.addTailSink( wcPipe, sink );

return flowDef;