DFSCodeComparator.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.fsm.dimspan.comparison;
import org.gradoop.flink.algorithms.fsm.dimspan.model.DFSCodeUtils;
import java.io.Serializable;
import java.util.Comparator;
/**
* Comparator for DFS codes based on gSpan lexicographical order.
* See <a href="https://www.cs.ucsb.edu/~xyan/software/gSpan.htm">gSpan</a>
*/
public class DFSCodeComparator implements Comparator<int[]>, Serializable {
/**
* util methods to interpret int-array encoded patterns
*/
private final DFSCodeUtils dfsCodeUtils = new DFSCodeUtils();
@Override
public int compare(int[] a, int[] b) {
int comparison;
boolean thisIsRoot = a.length == 0;
boolean thatIsRoot = b.length == 0;
// root is always smaller
if (thisIsRoot && ! thatIsRoot) {
comparison = -1;
} else if (thatIsRoot && !thisIsRoot) {
comparison = 1;
// none is root
} else {
comparison = 0;
int thisSize = dfsCodeUtils.getEdgeCount(a);
int thatSize = dfsCodeUtils.getEdgeCount(b);
boolean sameSize = thisSize == thatSize;
int minSize = sameSize || thisSize < thatSize ? thisSize : thatSize;
// compare extensions
for (int edgeTime = 0; edgeTime < minSize; edgeTime++) {
int thisFromTime = dfsCodeUtils.getFromId(a, edgeTime);
int thatFromTime = dfsCodeUtils.getFromId(b, edgeTime);
int thisToTime = dfsCodeUtils.getToId(a, edgeTime);
int thatToTime = dfsCodeUtils.getToId(b, edgeTime);
// no difference is times
if (thisFromTime == thatFromTime && thisToTime == thatToTime) {
// compare from Labels
int thisFromLabel = dfsCodeUtils.getFromLabel(a, edgeTime);
int thatFromLabel = dfsCodeUtils.getFromLabel(b, edgeTime);
comparison = thisFromLabel - thatFromLabel;
if (comparison == 0) {
// compare direction
boolean thisIsOutgoing = dfsCodeUtils.isOutgoing(a, edgeTime);
boolean thatIsOutgoing = dfsCodeUtils.isOutgoing(b, edgeTime);
if (thisIsOutgoing && !thatIsOutgoing) {
comparison = -1;
} else if (thatIsOutgoing && !thisIsOutgoing) {
comparison = 1;
} else {
// compare edge Labels
int thisEdgeLabel = dfsCodeUtils.getEdgeLabel(a, edgeTime);
int thatEdgeLabel = dfsCodeUtils.getEdgeLabel(b, edgeTime);
comparison = thisEdgeLabel - thatEdgeLabel;
if (comparison == 0) {
// compare to Labels
int thisToLabel = dfsCodeUtils.getToLabel(a, edgeTime);
int thatToLabel = dfsCodeUtils.getToLabel(b, edgeTime);
comparison = thisToLabel - thatToLabel;
}
}
}
// time differences
} else {
// compare backtracking
boolean thisIsBackwards = thisToTime <= thisFromTime;
boolean thatIsBackwards = thatToTime <= thatFromTime;
if (thisIsBackwards && !thatIsBackwards) {
comparison = -1;
} else if (thatIsBackwards && !thisIsBackwards) {
comparison = 1;
} else {
// both forwards
if (thatIsBackwards) {
// back to earlier vertex is smaller
comparison = thisToTime - thatToTime;
// both backwards
} else {
// forwards from later vertex is smaller
comparison = thatFromTime - thisFromTime;
}
}
}
// stop iteration as soon as one extension differs
if (comparison != 0) {
break;
}
}
// one is parent of other
if (comparison == 0 && !sameSize) {
// this is child, cause it has further traversals
if (thisSize > thatSize) {
comparison = 1;
// that is child, cause it has further traversals
} else {
comparison = -1;
}
}
}
return comparison;
}
}