GrowFrequentPatterns.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.fsm.dimspan.functions.mining;
import com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.gradoop.flink.algorithms.fsm.dimspan.comparison.DFSCodeComparator;
import org.gradoop.flink.algorithms.fsm.dimspan.config.DIMSpanConfig;
import org.gradoop.flink.algorithms.fsm.dimspan.config.DIMSpanConstants;
import org.gradoop.flink.algorithms.fsm.dimspan.config.DataflowStep;
import org.gradoop.flink.algorithms.fsm.dimspan.gspan.GSpanLogic;
import org.gradoop.flink.algorithms.fsm.dimspan.tuples.PatternEmbeddingsMap;
import org.gradoop.flink.algorithms.fsm.dimspan.model.Simple16Compressor;
import org.gradoop.flink.algorithms.fsm.dimspan.tuples.GraphWithPatternEmbeddingsMap;
import org.gradoop.flink.model.impl.tuples.WithCount;
import java.util.List;
/**
* {@code (graph, k-edge pattern -> embeddings) => (graph, k+1-edge pattern -> embeddings)}
*/
public class GrowFrequentPatterns
extends RichMapFunction<GraphWithPatternEmbeddingsMap, GraphWithPatternEmbeddingsMap> {
/**
* compressed k-edge frequent patterns for fast embedding map lookup
*/
private List<int[]> compressedFrequentPatterns;
/**
* uncompressed k-edge frequent patterns for pattern growth
*/
private List<int[]> frequentPatterns;
/**
* patterns with frequency for collector
*/
private List<WithCount<int[]>> patternFrequencies;
/**
* list of rightmost paths, index relates to frequent patterns
*/
private List<int[]> rightmostPaths;
/**
* pattern growth logic (directed or undirected mode)
*/
private final GSpanLogic gSpan;
/**
* flag to enable graph compression (true=enabled)
*/
private final boolean compressGraphs;
/**
* flag to enable pattern compression (true=enabled)
*/
private final boolean compressPatterns;
/**
* flag to enable pattern decompression (true=enabled)
*/
private final boolean uncompressFrequentPatterns;
/**
* flag to enable embedding compression (true=enabled)
*/
private final boolean compressEmbeddings;
/**
* flag to enable pattern verification before counting (true=enabled)
*/
private final boolean validatePatterns;
/**
* Constructor.
*
* @param gSpan pattern growth logic
* @param fsmConfig FSM Configuration
*/
public GrowFrequentPatterns(GSpanLogic gSpan, DIMSpanConfig fsmConfig) {
// set pattern growth logic for directed or undirected mode
this.gSpan = gSpan;
// cache compression flags
compressGraphs = fsmConfig.isGraphCompressionEnabled();
compressEmbeddings = fsmConfig.isEmbeddingCompressionEnabled();
compressPatterns = fsmConfig.getPatternCompressionInStep() == DataflowStep.MAP;
uncompressFrequentPatterns = fsmConfig.getPatternCompressionInStep() != DataflowStep.WITHOUT;
// cache validation flag
validatePatterns = fsmConfig.getPatternVerificationInStep() == DataflowStep.MAP;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// broadcast reception
patternFrequencies =
getRuntimeContext().getBroadcastVariable(DIMSpanConstants.FREQUENT_PATTERNS);
int patternCount = patternFrequencies.size();
this.frequentPatterns = Lists.newArrayListWithExpectedSize(patternCount);
for (WithCount<int[]> patternWithCount : patternFrequencies) {
int[] pattern = patternWithCount.getObject();
// uncompress
if (uncompressFrequentPatterns) {
pattern = Simple16Compressor.uncompress(pattern);
}
frequentPatterns.add(pattern);
}
// sort
frequentPatterns.sort(new DFSCodeComparator());
// calculate rightmost paths
this.rightmostPaths = Lists.newArrayListWithExpectedSize(patternCount);
this.compressedFrequentPatterns = Lists.newArrayListWithExpectedSize(patternCount);
for (int[] pattern : frequentPatterns) {
rightmostPaths.add(gSpan.getRightmostPathTimes(pattern));
// TODO: directly store compressed patterns at reception
compressedFrequentPatterns
.add(compressPatterns ? Simple16Compressor.compress(pattern) : pattern);
}
}
@Override
public GraphWithPatternEmbeddingsMap map(GraphWithPatternEmbeddingsMap pair) throws Exception {
// union k-1 edge frequent patterns with k-edge ones
if (pair.isFrequentPatternCollector()) {
for (WithCount<int[]> patternWithFrequency : patternFrequencies) {
pair.getMap().collect(patternWithFrequency);
}
} else {
int[] graph = pair.getGraph();
// uncompress graph
if (compressGraphs) {
graph = Simple16Compressor.uncompress(graph);
}
// execute pattern growth for all supported frequent patterns
PatternEmbeddingsMap childMap = gSpan.growPatterns(graph, pair.getMap(),
frequentPatterns, rightmostPaths, compressEmbeddings, compressedFrequentPatterns);
// drop non-minimal patterns if configured to be executed here
if (validatePatterns) {
PatternEmbeddingsMap validatedMap = PatternEmbeddingsMap.getEmptyOne();
for (int i = 0; i < childMap.getPatternCount(); i++) {
int[] pattern = childMap.getPattern(i);
if (gSpan.isMinimal(pattern)) {
int[] embeddingData = childMap.getValues()[i];
validatedMap.put(pattern, embeddingData);
}
}
childMap = validatedMap;
}
// update pattern-embedding map
pair.setPatternEmbeddings(childMap);
// compress patterns and embedding, if configured
// NOTE: graphs will remain compressed
if (compressPatterns) {
Simple16Compressor.compressPatterns(pair.getMap());
}
if (compressEmbeddings) {
Simple16Compressor.compressEmbeddings(pair.getMap());
}
}
return pair;
}
}