自定义数量和超时时间的 CountWithTimeTrigger
package vehicle.excavate.operator;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
public class CountWithTimeTrigger<T, W extends Window> extends Trigger<T, W> {
private final long maxCount;
private final long timeoutMs;
private final MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<>("mapState", String.class, Long.class);
private final String countStr = "count";
private final String timeStr = "time";
private CountWithTimeTrigger(long maxCount, long timeoutMs) {
this.maxCount = maxCount;
this.timeoutMs = timeoutMs;
}
@Override
// The onElement() method is called for each element that is added to a window.
public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
MapState<String, Long> mapState = ctx.getPartitionedState(mapStateDesc);
// set the default value if first call
if (!mapState.contains(countStr)) {
mapState.put(countStr, 0L);
}
if (!mapState.contains(timeStr)) {
mapState.put(timeStr, Long.MAX_VALUE);
}
final long count = mapState.get(countStr);
final long deadline = mapState.get(timeStr);
final long currentTimeMs = System.currentTimeMillis();
final long newCount = count + 1;
if (currentTimeMs >= deadline || newCount >= maxCount) {
return fire(mapState);
}
if (deadline == Long.MAX_VALUE) {
final long nextDeadline = currentTimeMs + timeoutMs;
mapState.put(timeStr, nextDeadline);
ctx.registerProcessingTimeTimer(nextDeadline);
}
mapState.put(countStr, newCount);
return TriggerResult.CONTINUE;
}
@Override
// The onEventTime() method is called when a registered event-time timer fires.
public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
// The onProcessingTime() method is called when a registered processing-time timer fires.
public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
MapState<String, Long> mapState = ctx.getPartitionedState(mapStateDesc);
final long deadline = mapState.get(timeStr);
if (deadline == time) {
return fire(mapState);
}
return TriggerResult.CONTINUE;
}
@Override
// Finally the clear() method performs any action needed upon removal of the corresponding window.
public void clear(W window, TriggerContext ctx) throws Exception {
MapState<String, Long> mapState = ctx.getPartitionedState(mapStateDesc);
final long deadlineValue = mapState.get(timeStr);
if (deadlineValue != Long.MAX_VALUE) {
ctx.deleteProcessingTimeTimer(deadlineValue);
}
mapState.clear();
}
private TriggerResult fire(MapState<String, Long> mapState) throws Exception {
mapState.put(timeStr, Long.MAX_VALUE);
mapState.put(countStr, 0L);
return TriggerResult.FIRE;
}
public static <T, W extends Window> CountWithTimeTrigger<T, W> of(long maxCount, long intervalMs) {
return new CountWithTimeTrigger<>(maxCount, intervalMs);
}
}
调用:
source.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountWithTimeTrigger.of(10, 1000)))
.process(...)
一直用的都是同一个窗口,PurgingTrigger
的作用是每次触发计算时清空窗口,如果不清理窗口,窗口的数据会一直累加
source.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountWithTimeTrigger.of(10, 1000)))
.process(new FakePlateOperator())
.addSink(new MySink());
或者触发计算的时候用 FIRE_AND_PURGE
private TriggerResult fire(MapState<String, Long> mapState) throws Exception {
mapState.put(timeStr, Long.MAX_VALUE);
mapState.put(countStr, 0L);
return TriggerResult.FIRE_AND_PURGE;
}