First code in Apache Beam word count example:
Word count
.java file->
package com.test.training;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
public class BasicWordCountWithExplaination {
 public static void main(String[] args) {
     // Create a PipelineOptions object. This object lets us set various execution
        // options for our pipeline, such as the runner you wish to use. This example
        // will run with the DirectRunner by default, based on the class path configured
        // in its dependencies.
     PipelineOptions options = PipelineOptionsFactory.create();
        // Create the Pipeline object with the options we defined above
     Pipeline p = Pipeline.create(options);
        /*
         * go to google cloud platform
         * select storage
         * create bucket
         * upload text file add some words
         * copy location of bucket
         * paste it below
         * save and run
         */
     PCollection<String> line=p.apply(TextIO.read().from("gs://testingbucketv/text.txt"));
     PCollection<String> count=line.apply(ParDo.of(new DoFn<String,String>(){
      @ProcessElement
      public void ProcessElement(ProcessContext c)
      {
       String arr[]=c.element().split(" ");
       System.out.println("length of words"+arr.length);
       c.output("Length of words is="+String.valueOf(arr.length));
       c.output("\n\nWords are=\n"+c.element());
      }
     }));
     //it will create file and store length
     count.apply(TextIO.write().to("gs://testingbucketv/output.txt\")").withoutSharding());
     //without sharding for single file
     //pipeline run
     p.run();
 }
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
text.txt file
hello dear Happy diwali... thanks
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
output.txt file
Length of words is=5 Words are= hello dear Happy diwali... thanks
Comments
Post a Comment