BuildFatVertex.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.flink.api.common.functions.RichGroupCombineFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.gradoop.flink.model.impl.operators.matching.common.query.QueryHandler;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples.FatVertex;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples.IdPair;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples.TripleWithDirection;

/**
 * Combines a collection of {@link TripleWithDirection} to a {@link FatVertex}.
 */
public class BuildFatVertex
  extends RichGroupCombineFunction<TripleWithDirection, FatVertex> {

  /**
   * serial version uid
   */
  private static final long serialVersionUID = 42L;

  /**
   * Reduce instantiations
   */
  private final FatVertex reuseVertex;

  /**
   * GDL query
   */
  private final String query;

  /**
   * Query handler
   */
  private transient QueryHandler qHandler;

  /**
   * Constructor
   *
   * @param query GDL query
   */
  public BuildFatVertex(String query) {
    this.query        = query;
    this.reuseVertex  = new FatVertex();
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    this.qHandler = new QueryHandler(query);
  }

  @Override
  public void combine(Iterable<TripleWithDirection> triples,
    Collector<FatVertex> collector) throws Exception {

    boolean first = true;

    for (TripleWithDirection triple : triples) {
      if (first) {
        initFatVertex(triple);
        first = false;
      }

      if (triple.isOutgoing()) {
        processOutgoingEdgeTriple(triple);
      } else {
        processIncomingEdgeTriple(triple);
      }
    }
    collector.collect(reuseVertex);
  }

  /**
   * Initializes the fat vertex.
   *
   * @param triple edge triple
   */
  private void initFatVertex(TripleWithDirection triple) {
    reuseVertex.setVertexId(triple.getSourceId());
    reuseVertex.setCandidates(Lists.newArrayList());
    reuseVertex.setParentIds(Lists.newArrayList());
    reuseVertex.setIncomingCandidateCounts(new int[qHandler.getEdgeCount()]);
    reuseVertex.setEdgeCandidates(Maps.newHashMap());
    reuseVertex.setUpdated(true);
  }

  /**
   * Updates vertex candidates and outgoing edges of the fat vertex based on the
   * given outgoing edge triple.
   *
   * @param triple outgoing edge tripe
   */
  private void processOutgoingEdgeTriple(TripleWithDirection triple) {
    for (int eQ = 0; eQ < triple.getCandidates().length; eQ++) {
      if (triple.getCandidates()[eQ]) {
        updateCandidates(qHandler.getEdgeById((long) eQ).getSourceVertexId());
      }
    }
    // update outgoing edges (OUT_CA)
    updateOutgoingEdges(triple);
  }

  /**
   * Updates query candidates of the resulting fat vertex.
   *
   * @param candidate query vertex id
   */
  private void updateCandidates(Long candidate) {
    if (!reuseVertex.getCandidates().contains(candidate)) {
      reuseVertex.getCandidates().add(candidate);
    }
  }

  /**
   * Updates outgoing edges of the resulting fat vertex.
   *
   * @param triple outgoing edge triple
   */
  private void updateOutgoingEdges(TripleWithDirection triple) {
    IdPair idPair = new IdPair();
    idPair.setEdgeId(triple.getEdgeId());
    idPair.setTargetId(triple.getTargetId());
    reuseVertex.getEdgeCandidates().put(idPair, triple.getCandidates());
  }

  /**
   * Updates vertex candidates, parent ids and incoming edge candidate counts of
   * the fat vertex based on the given incoming edge triple.
   *
   * @param triple incoming edge triple
   */
  private void processIncomingEdgeTriple(TripleWithDirection triple) {
    for (int eQ = 0; eQ < triple.getCandidates().length; eQ++) {
      if (triple.getCandidates()[eQ]) {
        // update incoming edge counts (IN_CA)
        reuseVertex.getIncomingCandidateCounts()[eQ]++;
        // update parent ids (P_IDs)
        updateParentIds(triple);
        // update vertex candidates (CA)
        updateCandidates(qHandler.getEdgeById((long) eQ).getTargetVertexId());
      }
    }
  }

  /**
   * Adds the targetId of the given triple to the parent ids.
   *
   * @param triple incoming edge triple
   */
  private void updateParentIds(TripleWithDirection triple) {
    if (!reuseVertex.getParentIds().contains(triple.getTargetId())) {
      reuseVertex.getParentIds().add(triple.getTargetId());
    }
  }
}