package com.hazelcast.jet.impl.pipeline.transform;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.impl.ProcessorClassLoaderTLHolder;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.pipeline.Planner;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.StreamSource;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* loaded from: input_file:lib/hazelcast-5.3.7.jar:com/hazelcast/jet/impl/pipeline/transform/StreamSourceTransform.class */
public class StreamSourceTransform<T> extends AbstractTransform implements StreamSource<T> {
    private static final long serialVersionUID = 1;
    public FunctionEx<? super EventTimePolicy<? super T>, ? extends ProcessorMetaSupplier> metaSupplierFn;
    private boolean isAssignedToStage;
    private boolean emitsWatermarks;

    @Nullable
    private EventTimePolicy<? super T> eventTimePolicy;
    private boolean supportsNativeTimestamps;
    private long partitionIdleTimeout;

    public StreamSourceTransform(@Nonnull String str, @Nonnull FunctionEx<? super EventTimePolicy<? super T>, ? extends ProcessorMetaSupplier> functionEx, boolean z, boolean z2) {
        super(str, (List<Transform>) Collections.emptyList());
        this.partitionIdleTimeout = EventTimePolicy.DEFAULT_IDLE_TIMEOUT;
        this.metaSupplierFn = functionEx;
        this.emitsWatermarks = z;
        this.supportsNativeTimestamps = z2;
    }

    public void onAssignToStage() {
        if (this.isAssignedToStage) {
            throw new IllegalStateException("Source " + name() + " was already assigned to a source stage");
        }
        this.isAssignedToStage = true;
    }

    @Override // com.hazelcast.jet.impl.pipeline.transform.Transform
    public void addToDag(Planner planner, PipelineImpl.Context context) {
        if (this.emitsWatermarks || this.eventTimePolicy == null) {
            ProcessorMetaSupplier apply = this.metaSupplierFn.apply(this.eventTimePolicy != null ? this.eventTimePolicy : EventTimePolicy.noEventTime());
            determineLocalParallelism(apply.preferredLocalParallelism(), context, false);
            planner.addVertex(this, name(), determinedLocalParallelism(), apply);
        } else {
            String name = name();
            ProcessorMetaSupplier apply2 = this.metaSupplierFn.apply(this.eventTimePolicy);
            determineLocalParallelism(apply2.preferredLocalParallelism(), context, false);
            planner.dag.edge(Edge.between(planner.dag.newVertex(name, apply2).localParallelism(determinedLocalParallelism()), planner.addVertex(this, name + "-add-timestamps", determinedLocalParallelism(), Processors.insertWatermarksP(this.eventTimePolicy)).v).isolated());
        }
    }

    @Nullable
    public EventTimePolicy<? super T> getEventTimePolicy() {
        return this.eventTimePolicy;
    }

    public void setEventTimePolicy(@Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        this.eventTimePolicy = eventTimePolicy;
    }

    public boolean emitsJetEvents() {
        return this.eventTimePolicy != null;
    }

    @Override // com.hazelcast.jet.pipeline.StreamSource
    public boolean supportsNativeTimestamps() {
        return this.supportsNativeTimestamps;
    }

    @Override // com.hazelcast.jet.pipeline.StreamSource
    public StreamSource<T> setPartitionIdleTimeout(long j) {
        Preconditions.checkNotNegative(j, "timeout must be >= 0 (0 means disabled)");
        this.partitionIdleTimeout = j;
        return this;
    }

    @Override // com.hazelcast.jet.pipeline.StreamSource
    public long partitionIdleTimeout() {
        return this.partitionIdleTimeout;
    }

    private void writeObject(ObjectOutputStream objectOutputStream) throws IOException {
        objectOutputStream.writeObject(this.metaSupplierFn);
        objectOutputStream.writeBoolean(this.isAssignedToStage);
        objectOutputStream.writeBoolean(this.emitsWatermarks);
        objectOutputStream.writeObject(this.eventTimePolicy);
        objectOutputStream.writeBoolean(this.supportsNativeTimestamps);
        objectOutputStream.writeLong(this.partitionIdleTimeout);
    }

    private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
        this.metaSupplierFn = (FunctionEx) Util.doWithClassLoader(ProcessorClassLoaderTLHolder.get(name()), () -> {
            return (FunctionEx) objectInputStream.readObject();
        });
        this.isAssignedToStage = objectInputStream.readBoolean();
        this.emitsWatermarks = objectInputStream.readBoolean();
        this.eventTimePolicy = (EventTimePolicy) objectInputStream.readObject();
        this.supportsNativeTimestamps = objectInputStream.readBoolean();
        this.partitionIdleTimeout = objectInputStream.readLong();
    }
}
