TransactionalFSMBase.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.algorithms.fsm.transactional.tle;

import org.apache.flink.api.java.DataSet;
import org.gradoop.flink.algorithms.fsm.dimspan.config.DIMSpanConstants;
import org.gradoop.flink.algorithms.fsm.dimspan.functions.mining.Frequent;
import org.gradoop.flink.algorithms.fsm.transactional.common.FSMConfig;
import org.gradoop.flink.algorithms.fsm.transactional.common.TFSMConstants;
import org.gradoop.flink.algorithms.fsm.transactional.common.functions.DropPropertiesAndGraphContainment;
import org.gradoop.flink.algorithms.fsm.transactional.common.functions.EdgeLabels;
import org.gradoop.flink.algorithms.fsm.transactional.common.functions.FilterEdgesByLabel;
import org.gradoop.flink.algorithms.fsm.transactional.common.functions.FilterVerticesByLabel;
import org.gradoop.flink.algorithms.fsm.transactional.common.functions.NotEmpty;
import org.gradoop.flink.algorithms.fsm.transactional.common.functions.VertexLabels;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.MinFrequency;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.api.operators.UnaryCollectionToCollectionOperator;
import org.gradoop.flink.model.impl.functions.tuple.ValueOfWithCount;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;
import org.gradoop.flink.model.impl.operators.count.Count;
import org.gradoop.flink.util.GradoopFlinkConfig;

/**
 * Superclass of frequent subgraph implementations in the graph transaction setting.
 */
public abstract class TransactionalFSMBase implements UnaryCollectionToCollectionOperator {

  /**
   * FSM configuration
   */
  protected final FSMConfig fsmConfig;

  /**
   * search space size
   */
  protected DataSet<Long> graphCount;

  /**
   * minimum frequency for patterns to be considered to be frequent
   */
  protected DataSet<Long> minFrequency;

  /**
   * Gradoop configuration
   */
  protected GradoopFlinkConfig config;

  /**
   * Constructor.
   *
   * @param fsmConfig FSM configuration
   */
  public TransactionalFSMBase(FSMConfig fsmConfig) {
    this.fsmConfig = fsmConfig;
  }

  @Override
  public GraphCollection execute(GraphCollection collection)  {
    config = collection.getConfig();

    DataSet<GraphTransaction> input = collection
      .getGraphTransactions();

    DataSet<GraphTransaction> output = execute(input);

    return collection.getFactory()
      .fromTransactions(output);
  }

  /**
   * Executes the algorithm for a dataset of graphs in transactional representation.
   *
   * @param transactions dataset of graphs
   *
   * @return frequent patterns as dataset of graphs
   */
  protected abstract DataSet<GraphTransaction> execute(DataSet<GraphTransaction> transactions);

  /**
   * Triggers the label-frequency base preprocessing
   *
   * @param transactions input
   * @return preprocessed input
   */
  protected DataSet<GraphTransaction> preProcess(DataSet<GraphTransaction> transactions) {
    transactions = transactions
      .map(new DropPropertiesAndGraphContainment());

    this.graphCount = Count
      .count(transactions);

    this.minFrequency = graphCount
      .map(new MinFrequency(fsmConfig));

    DataSet<String> frequentVertexLabels = transactions
      .flatMap(new VertexLabels())
      .groupBy(0)
      .sum(1)
      .filter(new Frequent<>())
      .withBroadcastSet(minFrequency, DIMSpanConstants.MIN_FREQUENCY)
      .map(new ValueOfWithCount<>());

    transactions = transactions
      .map(new FilterVerticesByLabel())
      .withBroadcastSet(frequentVertexLabels, TFSMConstants.FREQUENT_VERTEX_LABELS);

    DataSet<String> frequentEdgeLabels = transactions
      .flatMap(new EdgeLabels())
      .groupBy(0)
      .sum(1)
      .filter(new Frequent<>())
      .withBroadcastSet(minFrequency, DIMSpanConstants.MIN_FREQUENCY)
      .map(new ValueOfWithCount<>());

    transactions = transactions
      .map(new FilterEdgesByLabel())
      .withBroadcastSet(frequentEdgeLabels, TFSMConstants.FREQUENT_EDGE_LABELS);

    transactions = transactions
      .filter(new NotEmpty());

    return transactions;
  }
}