IndexedCSVFileFormat.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.io.impl.csv.indexed.functions;

import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.gradoop.flink.io.impl.csv.CSVConstants;
import org.gradoop.flink.io.impl.csv.tuples.CSVElement;

/**
 * This is an OutputFormat to serialize {@link Tuple}s to text by their labels.
 * The output is structured by record delimiters and field delimiters as common in CSV files.
 * Record delimiter separate records from each other ('\n' is common). Field
 * delimiters separate fields within a record.
 *
 * @param <T> Tuple that will be written to csv
 *
 * references to: org.apache.flink.api.java.io.CSVOutputFormat
 */
public class IndexedCSVFileFormat<T extends Tuple & CSVElement>
  extends MultipleFileOutputFormat<T> {

  /**
   * The default line delimiter if no one is set.
   */
  public static final String DEFAULT_LINE_DELIMITER = CSVConstants.ROW_DELIMITER;

  /**
   * The default field delimiter if no is set.
   */
  public static final String DEFAULT_FIELD_DELIMITER = CSVConstants.TOKEN_DELIMITER;

  // --------------------------------------------------------------------------------

  /**
   * The character the fields should be separated.
   */
  private String fieldDelimiter;

  /**
   * The character the lines should be separated.
   */
  private String recordDelimiter;

  /**
   * The charset that is used for the output encoding.
   */
  private String charsetName = null;

  /**
   * Creates a new instance of an IndexedCSVFileFormat. Use the default record delimiter '\n'
   * and the default field delimiter ','.
   *
   * @param outputPath The path where the CSV file will be written.
   */
  public IndexedCSVFileFormat(Path outputPath) {
    this(outputPath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER);
  }

  /**
   * Creates a new instance of an IndexedCSVFileFormat. Use the default record delimiter '\n'.
   *
   * @param outputPath The path where the CSV file will be written.
   * @param fieldDelimiter The field delimiter for the CSV file.
   */
  public IndexedCSVFileFormat(Path outputPath, String fieldDelimiter) {
    this(outputPath, DEFAULT_LINE_DELIMITER, fieldDelimiter);
  }

  /**
   * Creates a new instance of an IndexedCSVFileFormat.
   *
   * @param outputPath The path where the CSV file will be written.
   * @param recordDelimiter The record delimiter for the CSV file.
   * @param fieldDelimiter The field delimiter for the CSV file.
   */
  public IndexedCSVFileFormat(Path outputPath, String recordDelimiter, String fieldDelimiter) {
    super(outputPath);
    if (recordDelimiter == null) {
      throw new IllegalArgumentException("RecordDelmiter shall not be null.");
    }

    if (fieldDelimiter == null) {
      throw new IllegalArgumentException("FieldDelimiter shall not be null.");
    }

    this.fieldDelimiter = fieldDelimiter;
    this.recordDelimiter = recordDelimiter;
  }

  /**
   * Sets the charset with which the CSV strings are written to the file.
   * If not specified, the output format uses the systems default character encoding.
   *
   * @param charsetName The name of charset to use for encoding the output.
   */
  public void setCharsetName(String charsetName) {
    this.charsetName = charsetName;
  }

  @Override
  protected OutputFormat<T> createFormatForDirectory(Path directory) {
    CsvOutputFormat<T> format = new CsvOutputFormat<>(directory, recordDelimiter, fieldDelimiter);
    // OVERWRITE in a distributed fs would delete the label directory including already written
    // files of other workers. In a local fs it does not delete the directory but it would not
    // overwrite files of workers not having a specific label.
    // initializeGlobal() takes care of OVERWRITE.
    format.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
    if (charsetName != null) {
      format.setCharsetName(charsetName);
    }
    format.configure(configuration);
    return format;
  }

  @Override
  protected String getDirectoryForRecord(T record) {
    String label = record.getLabel();
    if (label.isEmpty()) {
      return cleanFilename(CSVConstants.DEFAULT_DIRECTORY) + Path.SEPARATOR +
        CSVConstants.SIMPLE_FILE;
    } else {
      return cleanFilename(label) + Path.SEPARATOR + CSVConstants.SIMPLE_FILE;
    }
  }
}