Read CSV files from S3 using Spring Batch
In this post, let’s see how to read files from S3 using spring batch.
We will read CSV files as stream without loading the entire data in memory and process each item. For Item writer, we’ll use just the no operation item writer since we will not write the items to file or any database tables.
Job configuration
Let’s start with the Job configuration.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Bean public Job processMovieDataFromS3Job() throws Exception { FlowBuilder<Flow> dataProcessFlowBuilder = new FlowBuilder<Flow>("dataProcess-flow"); Flow dataProcessFlow = dataProcessFlowBuilder .start(movieDataProcessStep()) .on(ExitStatus.COMPLETED.getExitCode()).end() .from(movieDataProcessStep()) .on(ANY_OTHER_EXIT_STATUS).fail() .end(); return jobBuilderFactory.get("movieDataProcess-job") .incrementer(new RunIdIncrementer()) .start(dataProcessFlow) .end() .listener(movieJobExecutionListener()) .build(); } |
Next, we will define a step to read and process data from CSV present in S3
1 2 3 4 5 6 7 8 9 10 11 12 13 |
@Bean public Step movieDataProcessStep() throws Exception { Step step = this.stepBuilderFactory.get("dataProcess-step") .<Movie, Movie>chunk(chunkSize) .reader(movieDataReader()) .processor(asyncMovieDataProcessor()) .writer(asyncMovieItemWriter()) .build(); ((TaskletStep) step).setStepExecutionListeners(new StepExecutionListener[]{ movieStepExecutionListener() }); return step; } |
In the step, we will use read-process-write
pattern.
Item reader
For Reader config, we will use SynchronizedItemReader delegating to MultiResourceItemReader. To the MultiResourceItemReader, we will add list of resources. Resources in this case is the files in S3.
Update the value of sourceBucket
and sourceObjectPrefix
based on the S3 bucket details of your use case.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
@Bean(destroyMethod="") @StepScope public SynchronizedItemStreamReader<Movie> movieDataReader() { SynchronizedItemStreamReader synchronizedItemStreamReader = new SynchronizedItemStreamReader(); List<Resource> resourceList = new ArrayList<>(); String sourceBucket = rawDataS3Bucket; String sourceObjectPrefix = rawDataS3ObjectPrefix .concat("MOVIES") .concat(FORWARD_SLASH); log.info("sourceObjectPrefix::"+sourceObjectPrefix); ListObjectsRequest listObjectsRequest = new ListObjectsRequest() .withBucketName(sourceBucket) .withPrefix(sourceObjectPrefix); ObjectListing sourceObjectsListing; do{ sourceObjectsListing = amazonS3Client.listObjects(listObjectsRequest); for (S3ObjectSummary sourceFile : sourceObjectsListing.getObjectSummaries()){ if(!(sourceFile.getSize() > 0) || (!sourceFile.getKey().endsWith(DOT.concat(inputDataFileExtension))) ){ // Skip if file is empty (or) file extension is not "csv" continue; } log.info("Reading "+sourceFile.getKey()); resourceList.add(resourceLoader.getResource(S3_PROTOCOL_PREFIX.concat(sourceBucket).concat(FORWARD_SLASH) .concat(sourceFile.getKey()))); } listObjectsRequest.setMarker(sourceObjectsListing.getNextMarker()); }while(sourceObjectsListing.isTruncated()); Resource[] resources = resourceList.toArray(new Resource[resourceList.size()]); MultiResourceItemReader<Movie> multiResourceItemReader = new MultiResourceItemReader<>(); multiResourceItemReader.setName("movie-multiResource-Reader"); multiResourceItemReader.setResources(resources); multiResourceItemReader.setDelegate(movieFileItemReader()); synchronizedItemStreamReader.setDelegate(multiResourceItemReader); return synchronizedItemStreamReader; } |
For this example, I have created a sample movie data set. Flat file item reader config to parse CSV file as below
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
@Bean @StepScope public FlatFileItemReader<Movie> movieFileItemReader() { FlatFileItemReader<Movie> reader = new FlatFileItemReader<>(); reader.setLinesToSkip(1); DefaultLineMapper<Movie> movieDataLineMapper = new DefaultLineMapper(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[] { "name","genre","releaseYear","releasePlatform" }); movieDataLineMapper.setFieldSetMapper(movieFieldSetMapper()); movieDataLineMapper.setLineTokenizer(tokenizer); reader.setLineMapper(movieDataLineMapper); reader.setRecordSeparatorPolicy(new DefaultRecordSeparatorPolicy()); return reader; } @Bean public FieldSetMapper<Movie> movieFieldSetMapper(){ return new MovieFieldSetMapper(); } |
Item processor
For item processor, we will configure with async capabilities. This is not needed for the sample data set for the example in this post. However, when you want to process a huge data set async processing will be very helpful.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
// ASYNC ITEM PROCESSOR @Bean @StepScope public ItemProcessor asyncMovieDataProcessor(){ AsyncItemProcessor<Movie, Movie> asyncItemProcessor = new AsyncItemProcessor<>(); asyncItemProcessor.setDelegate(movieDataProcessor(null)); asyncItemProcessor.setTaskExecutor(asyncMovieTaskExecutor()); return asyncItemProcessor; } // DELEGATE ITEM PROCESSOR @Bean @StepScope public ItemProcessor<Movie, Movie> movieDataProcessor(@Value("#{stepExecution}") StepExecution stepExecution) { MovieDataProcessor movieDataProcessor = new MovieDataProcessor(); return movieDataProcessor; } // TASK EXECUTOR @Bean public TaskExecutor asyncMovieTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(threadPoolSize); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadNamePrefix("movieExec-"); return executor; } |
Item writer
Then, for item writer we configure it with async. This is to support the item processor which process the items in multiple threads and we need to handle the Future which gets returned.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// WRITER @Bean @StepScope public ItemWriter asyncMovieItemWriter() throws Exception{ SimpleAsyncItemWriter<Movie> simpleAsyncItemWriter = new SimpleAsyncItemWriter<>(); simpleAsyncItemWriter.setDelegate(movieNoOpItemWriter()); return simpleAsyncItemWriter; } @Bean @StepScope public ItemWriter movieNoOpItemWriter() { return new NoOpItemWriter(); } |
Database
We will use H2 in memory database.
1 2 3 4 5 6 7 8 |
// H2 dependency implementation group: 'com.h2database', name: 'h2', version: '1.4.193' // Data source config spring.batch.datasource.jdbc-url=jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1 spring.batch.datasource.driverClassName=org.h2.Driver spring.batch.datasource.username=sa spring.batch.datasource.password= |
Running the application
To run the application, just execute Run
from ReadFileFromS3BatchApplication.java file.
I have created two movie data set CSV files for this example. The data set files are available under resources folder in the project for reference. The execution logs are as below
Conclusion
In this article, we have explored how to read files from S3 bucket using spring batch. The key note is we have processed the files as stream without loading them in memory.
The complete source code is available over on Git
0 Comments