EncodeAndPruneEdges.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.fsm.dimspan.functions.preprocessing;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.gradoop.flink.algorithms.fsm.dimspan.comparison.DFSBranchComparator;
import org.gradoop.flink.algorithms.fsm.dimspan.comparison.DirectedDFSBranchComparator;
import org.gradoop.flink.algorithms.fsm.dimspan.comparison.UndirectedDFSBranchComparator;
import org.gradoop.flink.algorithms.fsm.dimspan.config.DIMSpanConfig;
import org.gradoop.flink.algorithms.fsm.dimspan.config.DIMSpanConstants;
import org.gradoop.flink.algorithms.fsm.dimspan.model.GraphUtils;
import org.gradoop.flink.algorithms.fsm.dimspan.model.SearchGraphUtils;
import org.gradoop.flink.algorithms.fsm.dimspan.model.UnsortedSearchGraphUtils;
import org.gradoop.flink.algorithms.fsm.dimspan.tuples.LabeledGraphIntString;
import java.util.Arrays;
import java.util.Map;
/**
* Encodes edge labels to integers.
* Drops edges with infrequent labels and isolated vertices.
*/
public class EncodeAndPruneEdges extends RichMapFunction<LabeledGraphIntString, int[]> {
/**
* edge label dictionary
*/
private Map<String, Integer> edgeDictionary = Maps.newHashMap();
/**
* flag to enable graph sorting (true=enabled)
*/
private final boolean sortGraph;
/**
* comparator used for graph sorting
*/
private final DFSBranchComparator branchComparator;
/**
* util methods to interpret and manipulate int-array encoded graphs
*/
private final SearchGraphUtils graphUtils = new UnsortedSearchGraphUtils();
/**
* Constructor
*
* @param fsmConfig FSM configuration
*/
public EncodeAndPruneEdges(DIMSpanConfig fsmConfig) {
sortGraph = fsmConfig.isBranchConstraintEnabled();
branchComparator = fsmConfig.isDirected() ?
new DirectedDFSBranchComparator() :
new UndirectedDFSBranchComparator();
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// create inverse dictionary at broadcast reception
String[] broadcast = getRuntimeContext()
.<String[]>getBroadcastVariable(DIMSpanConstants.EDGE_DICTIONARY).get(0);
for (int i = 0; i < broadcast.length; i++) {
edgeDictionary.put(broadcast[i], i);
}
}
@Override
public int[] map(LabeledGraphIntString inGraph) throws Exception {
int[][] dfsCodes = new int[0][];
// prune edges and convert to 1-edge minimum DFS codes;
// isolated vertices will be automatically removed as source and target information is stored
// attached to edges
for (int edgeId = 0; edgeId < inGraph.size(); edgeId++) {
Integer edgeLabel = edgeDictionary.get(inGraph.getEdgeLabel(edgeId));
if (edgeLabel != null) {
int sourceId = inGraph.getSourceId(edgeId);
int sourceLabel = inGraph.getSourceLabel(edgeId);
int targetId = inGraph.getTargetId(edgeId);
int targetLabel = inGraph.getTargetLabel(edgeId);
int[] dfsCode = sourceLabel <= targetLabel ?
graphUtils.multiplex(
sourceId, sourceLabel, true, edgeLabel, targetId, targetLabel) :
graphUtils.multiplex(
targetId, targetLabel, false, edgeLabel, sourceId, sourceLabel);
dfsCodes = ArrayUtils.add(dfsCodes, dfsCode);
}
}
// optionally sort 1-edge DFS codes
if (sortGraph) {
Arrays.sort(dfsCodes, branchComparator);
}
// multiplex 1-edge DFS codes
int[] outGraph = new int[dfsCodes.length * GraphUtils.EDGE_LENGTH];
int i = 0;
for (int[] dfsCode : dfsCodes) {
System.arraycopy(dfsCode, 0, outGraph, i * 6, GraphUtils.EDGE_LENGTH);
i++;
}
return outGraph;
}
}