ThinkLikeAnEmbeddingTFSM.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.algorithms.fsm.transactional;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.gradoop.flink.algorithms.fsm.dimspan.config.DIMSpanConstants;
import org.gradoop.flink.algorithms.fsm.transactional.tle.ThinkLikeAnEmbeddingFSMBase;
import org.gradoop.flink.algorithms.fsm.transactional.common.FSMConfig;
import org.gradoop.flink.algorithms.fsm.dimspan.functions.mining.Frequent;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.MinEdgeCount;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.IsResult;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.TFSMSingleEdgeEmbeddings;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.TFSMSubgraphDecoder;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.TFSMSubgraphOnly;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.ToTFSMGraph;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.TFSMWrapInSubgraphEmbeddings;
import org.gradoop.flink.algorithms.fsm.transactional.tle.pojos.TFSMGraph;
import org.gradoop.flink.algorithms.fsm.transactional.tle.tuples.TFSMSubgraph;
import org.gradoop.flink.algorithms.fsm.transactional.tle.tuples.TFSMSubgraphEmbeddings;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;

/**
 * abstract superclass of different implementations of the gSpan frequent
 * subgraph mining algorithm as Gradoop operator
 */
public class ThinkLikeAnEmbeddingTFSM
  extends ThinkLikeAnEmbeddingFSMBase<TFSMGraph, TFSMSubgraph, TFSMSubgraphEmbeddings> {

  /**
   * Constructor
   *
   * @param fsmConfig frequent subgraph mining configuration
   */
  public ThinkLikeAnEmbeddingTFSM(FSMConfig fsmConfig) {
    super(fsmConfig);
  }

  /**
   * Core mining method.
   *
   * @param transactions search space
   *
   * @return frequent subgraphs
   */
  public DataSet<GraphTransaction> execute(DataSet<GraphTransaction> transactions) {

    transactions = preProcess(transactions);

    DataSet<TFSMGraph> graphs = transactions
      .map(new ToTFSMGraph());

    DataSet<TFSMSubgraphEmbeddings> embeddings = graphs
      .flatMap(new TFSMSingleEdgeEmbeddings(fsmConfig));

    // ITERATION HEAD
    IterativeDataSet<TFSMSubgraphEmbeddings> iterative = embeddings
      .iterate(fsmConfig.getMaxEdgeCount());

    // ITERATION BODY

    // get frequent subgraphs
    DataSet<TFSMSubgraphEmbeddings> parentEmbeddings = iterative
      .filter(new IsResult<>(false));

    DataSet<TFSMSubgraph> frequentSubgraphs =
      getFrequentSubgraphs(parentEmbeddings);

    parentEmbeddings =
      filterByFrequentSubgraphs(parentEmbeddings, frequentSubgraphs);

    DataSet<TFSMSubgraphEmbeddings> childEmbeddings =
      growEmbeddingsOfFrequentSubgraphs(parentEmbeddings, frequentSubgraphs);

    DataSet<TFSMSubgraphEmbeddings> resultIncrement = frequentSubgraphs
      .map(new TFSMWrapInSubgraphEmbeddings());

    DataSet<TFSMSubgraphEmbeddings> resultAndEmbeddings = iterative
      .filter(new IsResult<>(true))
      .union(resultIncrement)
      .union(childEmbeddings);

    // ITERATION FOOTER

    DataSet<TFSMSubgraph>  allFrequentSubgraphs = iterative
      .closeWith(resultAndEmbeddings, childEmbeddings)
      .filter(new IsResult<>(true))
      .map(new TFSMSubgraphOnly());


    if (fsmConfig.getMinEdgeCount() > 1) {
      allFrequentSubgraphs = allFrequentSubgraphs
        .filter(new MinEdgeCount<>(fsmConfig));
    }

    return allFrequentSubgraphs.map(new TFSMSubgraphDecoder(config));
  }

  /**
   * Determines frequent subgraphs in a set of embeddings.
   *
   * @param embeddings set of embeddings
   * @return frequent subgraphs
   */
  private DataSet<TFSMSubgraph> getFrequentSubgraphs(
    DataSet<TFSMSubgraphEmbeddings> embeddings) {
    return embeddings
        .map(new TFSMSubgraphOnly())
        .groupBy(0)
        .sum(1)
        .filter(new Frequent<>())
        .withBroadcastSet(minFrequency, DIMSpanConstants.MIN_FREQUENCY);
  }

}