FlinkAsciiGraphLoader.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.util;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.common.util.AsciiGraphLoader;
import org.gradoop.common.util.GradoopConstants;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.epgm.LogicalGraphFactory;
import org.gradoop.flink.model.impl.functions.epgm.RenameLabel;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;

/**
 * Used the {@link AsciiGraphLoader} to generate instances of
 * {@link LogicalGraph} and {@link GraphCollection} from GDL.
 *
 * @see <a href="https://github.com/dbs-leipzig/gdl">GDL on GitHub</a>
 */
public class FlinkAsciiGraphLoader {

  /**
   * Gradoop Flink configuration
   */
  private final GradoopFlinkConfig config;

  /**
   * AsciiGraphLoader to create graph, vertex and edge collections.
   */
  private AsciiGraphLoader<EPGMGraphHead, EPGMVertex, EPGMEdge> loader;

  /**
   * Creates a new FlinkAsciiGraphLoader instance.
   *
   * @param config Gradoop Flink configuration
   */
  public FlinkAsciiGraphLoader(GradoopFlinkConfig config) {
    if (config == null) {
      throw new IllegalArgumentException("Config must not be null.");
    }
    this.config = config;
  }

  /**
   * Initializes the database from the given ASCII GDL string.
   *
   * @param asciiGraphs GDL string (must not be {@code null})
   */
  public void initDatabaseFromString(String asciiGraphs) {
    if (asciiGraphs == null) {
      throw new IllegalArgumentException("AsciiGraph must not be null");
    }
    loader = AsciiGraphLoader.fromString(asciiGraphs, config.getLogicalGraphFactory());
  }

  /**
   * Initializes the database from the given ASCII GDL stream.
   *
   * @param stream GDL stream
   * @throws IOException on failure
   */
  public void initDatabaseFromStream(InputStream stream) throws IOException {
    if (stream == null) {
      throw new IllegalArgumentException("AsciiGraph must not be null");
    }
    loader = AsciiGraphLoader.fromStream(stream, config.getLogicalGraphFactory());
  }

  /**
   * Appends the given ASCII GDL String to the database.
   *
   * Variables previously used can be reused as their refer to the same objects.
   *
   * @param asciiGraph GDL string (must not be {@code null})
   */
  public void appendToDatabaseFromString(String asciiGraph) {
    if (asciiGraph == null) {
      throw new IllegalArgumentException("AsciiGraph must not be null");
    }
    if (loader != null) {
      loader.appendFromString(asciiGraph);
    } else {
      initDatabaseFromString(asciiGraph);
    }
  }

  /**
   * Initializes the database from the given GDL file.
   *
   * @param fileName GDL file name (must not be {@code null})
   * @throws IOException on failure
   */
  public void initDatabaseFromFile(String fileName) throws IOException {
    if (fileName == null) {
      throw new IllegalArgumentException("FileName must not be null.");
    }
    loader = AsciiGraphLoader.fromFile(fileName, config.getLogicalGraphFactory());
  }

  /**
   * Returns a logical graph containing the complete vertex and edge space of
   * the database.
   * This is equivalent to {@link #getLogicalGraph(boolean) getLogicalGraph(true)}.
   *
   * @return logical graph of vertex and edge space
   */
  public LogicalGraph getLogicalGraph() {
    return getLogicalGraph(true);
  }

  /**
   * Returns a logical graph containing the complete vertex and edge space of
   * the database.
   *
   * @param withGraphContainment true, if vertices and edges shall be updated to
   *                             be contained in the logical graph representing
   *                             the database
   * @return logical graph of vertex and edge space
   */
  public LogicalGraph getLogicalGraph(boolean withGraphContainment) {
    final LogicalGraphFactory factory = config.getLogicalGraphFactory();
    if (withGraphContainment) {
      return factory.fromCollections(getVertices(), getEdges())
        .transformGraphHead(new RenameLabel<>(GradoopConstants.DEFAULT_GRAPH_LABEL,
          GradoopConstants.DB_GRAPH_LABEL));
    } else {
      EPGMGraphHead graphHead = factory.getGraphHeadFactory()
        .createGraphHead(GradoopConstants.DB_GRAPH_LABEL);
      return factory.fromCollections(graphHead, getVertices(), getEdges());
    }
  }

  /**
   * Builds a {@link LogicalGraph} from the graph referenced by the given
   * graph variable.
   *
   * @param variable graph variable used in GDL script
   * @return LogicalGraph
   */
  public LogicalGraph getLogicalGraphByVariable(String variable) {
    EPGMGraphHead graphHead = getGraphHeadByVariable(variable);
    Collection<EPGMVertex> vertices = getVerticesByGraphVariables(variable);
    Collection<EPGMEdge> edges = getEdgesByGraphVariables(variable);

    return config.getLogicalGraphFactory().fromCollections(graphHead, vertices, edges);
  }

  /**
   * Returns a collection of all logical graph contained in the database.
   *
   * @return collection of all logical graphs
   */
  public GraphCollection getGraphCollection() {
    ExecutionEnvironment env = config.getExecutionEnvironment();

    DataSet<EPGMVertex> newVertices = env.fromCollection(getVertices())
      .filter(vertex -> vertex.getGraphCount() > 0);
    DataSet<EPGMEdge> newEdges = env.fromCollection(getEdges())
      .filter(edge -> edge.getGraphCount() > 0);

    return config.getGraphCollectionFactory()
      .fromDataSets(env.fromCollection(getGraphHeads()), newVertices, newEdges);
  }

  /**
   * Builds a {@link GraphCollection} from the graph referenced by the given
   * graph variables.
   *
   * @param variables graph variables used in GDL script
   * @return GraphCollection
   */
  public GraphCollection getGraphCollectionByVariables(String... variables) {
    Collection<EPGMGraphHead> graphHeads = getGraphHeadsByVariables(variables);
    Collection<EPGMVertex> vertices = getVerticesByGraphVariables(variables);
    Collection<EPGMEdge> edges = getEdgesByGraphVariables(variables);

    return config.getGraphCollectionFactory().fromCollections(graphHeads, vertices, edges);
  }

  /**
   * Returns all GraphHeads contained in the ASCII graph.
   *
   * @return graphHeads
   */
  public Collection<EPGMGraphHead> getGraphHeads() {
    return loader.getGraphHeads();
  }

  /**
   * Returns GraphHead by given variable.
   *
   * @param variable variable used in GDL script
   * @return graphHead or {@code null} if graph is not cached
   */
  public EPGMGraphHead getGraphHeadByVariable(String variable) {
    return loader.getGraphHeadByVariable(variable);
  }

  /**
   * Returns the graph heads assigned to the specified variables.
   *
   * @param variables variables used in the GDL script
   * @return graphHeads assigned to the variables
   */
  public Collection<EPGMGraphHead> getGraphHeadsByVariables(String... variables) {
    return loader.getGraphHeadsByVariables(variables);
  }

  /**
   * Returns all vertices contained in the ASCII graph.
   *
   * @return vertices
   */
  public Collection<EPGMVertex> getVertices() {
    return loader.getVertices();
  }

  /**
   * Returns all vertices that belong to the given graph variables.
   *
   * @param variables graph variables used in the GDL script
   * @return vertices that are contained in the graphs
   */
  public Collection<EPGMVertex> getVerticesByGraphVariables(String... variables) {
    return loader.getVerticesByGraphVariables(variables);
  }

  /**
   * Returns the vertex which is identified by the given variable. If the
   * variable cannot be found, the method returns {@code null}.
   *
   * @param variable vertex variable
   * @return vertex or {@code null} if variable is not used
   */
  public EPGMVertex getVertexByVariable(String variable) {
    return loader.getVertexByVariable(variable);
  }

  /**
   * Returns all edges contained in the ASCII graph.
   *
   * @return edges
   */
  public Collection<EPGMEdge> getEdges() {
    return loader.getEdges();
  }

  /**
   * Returns all edges that belong to the given graph variables.
   *
   * @param variables graph variables used in the GDL script
   * @return edges
   */
  public Collection<EPGMEdge> getEdgesByGraphVariables(String... variables) {
    return loader.getEdgesByGraphVariables(variables);
  }

  /**
   * Returns the edge which is identified by the given variable. If the
   * variable cannot be found, the method returns {@code null}.
   *
   * @param variable edge variable
   * @return edge or {@code null} if variable is not used
   */
  public EPGMEdge getEdgeByVariable(String variable) {
    return loader.getEdgeByVariable(variable);
  }
}