Neighborhood.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.dataintegration.transformation.impl;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.dataintegration.transformation.functions.CreateNeighborList;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;
import org.gradoop.flink.model.impl.functions.epgm.TargetId;

import java.util.List;
import java.util.Objects;

/**
 * This class contains everything related to the neighborhood of vertices.
 * A vertex pojo, edge direction and a method to calculate the neighborhood of all vertices.
 */
public class Neighborhood {

  /**
   * This methods returns a {@link DataSet} containing a tuple where the first part is a vertex
   * and the second one a list of all neighbors of this vertex. The vertices are derived from the
   * centralVertices DataSet while the List elements are taken from the graph.
   *
   * @param graph A Graph the operation is executed on.
   * @param centralVertices The vertices the neighborhood should be calculated for.
   * @param edgeDirection The relevant direction for neighbors.
   * @return A Dataset of tuples containing vertices and their neighborhood.
   * @throws NullPointerException if any of the parameters is null.
   */
  public static DataSet<Tuple2<EPGMVertex, List<NeighborhoodVertex>>> getPerVertex(LogicalGraph graph,
                 DataSet<EPGMVertex> centralVertices, EdgeDirection edgeDirection) {
    Objects.requireNonNull(graph);
    Objects.requireNonNull(centralVertices);
    Objects.requireNonNull(edgeDirection);
    DataSet<Tuple2<EPGMVertex, List<NeighborhoodVertex>>> incoming = null;
    DataSet<Tuple2<EPGMVertex, List<NeighborhoodVertex>>> outgoing = null;

    // get incoming
    if (edgeDirection.equals(EdgeDirection.INCOMING) ||
      edgeDirection.equals(EdgeDirection.UNDIRECTED)) {
      incoming = graph.getEdges()
        .coGroup(centralVertices)
        .where(new TargetId<>())
        .equalTo(new Id<>())
        .with(new CreateNeighborList(edgeDirection));
    }

    // get outgoing
    if (edgeDirection.equals(EdgeDirection.OUTGOING) ||
      edgeDirection.equals(EdgeDirection.UNDIRECTED)) {
      outgoing = graph.getEdges()
        .coGroup(centralVertices)
        .where(new SourceId<>())
        .equalTo(new Id<>())
        .with(new CreateNeighborList(edgeDirection));
    }

    if (edgeDirection.equals(EdgeDirection.UNDIRECTED)) {
      return incoming.union(outgoing);
    }
    return edgeDirection.equals(EdgeDirection.INCOMING) ? incoming : outgoing;
  }

  /**
   * A simple ENUM which contains possible edge directions viewed from the central vertex.
   */
  public enum EdgeDirection {
    /**
     * Can be used for edges starting from the neighbor to the central vertex.
     */
    INCOMING,

    /**
     * Can be used for edges starting from the central vertex to the neighbor.
     */
    OUTGOING,

    /**
     * Can be used if the edge direction should be ignored an INCOMING and OUTGOING edges should be
     * taken into account for the calculation.
     */
    UNDIRECTED
  }
}