EdgesFromLocalTransitiveClosure.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.dataintegration.transformation.functions;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.EdgeFactory;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.EPGMEdgeFactory;
import org.gradoop.dataintegration.transformation.impl.NeighborhoodVertex;

import java.util.Iterator;
import java.util.List;
import java.util.Objects;

/**
 * This function supports the calculation of the transitive closure in the direct neighborhood of a
 * vertex. For each transitive connection an edge is created containing the properties of the
 * central vertex and the labels of the first and second edge that need to be traversed for the
 * transitive connection.
 *
 * @param <V> The vertex type.
 * @param <E> The edge type.
 */
public class EdgesFromLocalTransitiveClosure<V extends Vertex, E extends Edge>
  implements CoGroupFunction<Tuple2<V, List<NeighborhoodVertex>>,
    Tuple2<V, List<NeighborhoodVertex>>, E>, ResultTypeQueryable<E> {

  /**
   * The property key used to store the original vertex label on the new edge.
   */
  public static final String ORIGINAL_VERTEX_LABEL = "originalVertexLabel";

  /**
   * The property key used to store the first label of the combined edges on the new edge.
   */
  public static final String FIRST_EDGE_LABEL = "firstEdgeLabel";

  /**
   * The property key used to store the second label of the combined edges on the new edge.
   */
  public static final String SECOND_EDGE_LABEL = "secondEdgeLabel";

  /**
   * The type of the edges created by the factory..
   */
  private final Class<E> edgeType;

  /**
   * Reduce object instantiations.
   */
  private final E reuse;

  /**
   * The constructor of the CoGroup function to created new edges based on transitivity.
   *
   * @param newEdgeLabel The edge label of the newly created edge.
   * @param factory The {@link EPGMEdgeFactory} new edges are created with.
   */
  public EdgesFromLocalTransitiveClosure(String newEdgeLabel, EdgeFactory<E> factory) {
    this.edgeType = Objects.requireNonNull(factory).getType();
    this.reuse = factory.createEdge(Objects.requireNonNull(newEdgeLabel), GradoopId.NULL_VALUE,
      GradoopId.NULL_VALUE);
  }

  @Override
  public void coGroup(Iterable<Tuple2<V, List<NeighborhoodVertex>>> incoming,
                      Iterable<Tuple2<V, List<NeighborhoodVertex>>> outgoing,
                      Collector<E> edges) {

    Iterator<Tuple2<V, List<NeighborhoodVertex>>> incIt = incoming.iterator();
    Iterator<Tuple2<V, List<NeighborhoodVertex>>> outIt = outgoing.iterator();

    if (incIt.hasNext() && outIt.hasNext()) {
      // each of the incoming and outgoing sets should be represented only once.
      Tuple2<V, List<NeighborhoodVertex>> first = incIt.next();
      V centralVertex = first.f0;
      List<NeighborhoodVertex> in = first.f1;
      List<NeighborhoodVertex> out = outIt.next().f1;
      if (in.isEmpty()) {
        return;
      }
      reuse.setProperties(centralVertex.getProperties());
      reuse.setProperty(ORIGINAL_VERTEX_LABEL, centralVertex.getLabel());

      for (NeighborhoodVertex source : in) {
        for (NeighborhoodVertex target : out) {
          reuse.setId(GradoopId.get());
          reuse.setSourceId(source.getNeighborId());
          reuse.setTargetId(target.getNeighborId());

          reuse.setProperty(FIRST_EDGE_LABEL, source.getConnectingEdgeLabel());
          reuse.setProperty(SECOND_EDGE_LABEL, target.getConnectingEdgeLabel());

          edges.collect(reuse);
        }
      }
    }
  }

  @Override
  public TypeInformation<E> getProducedType() {
    return TypeInformation.of(edgeType);
  }
}