package org.eclipse.hono.client.impl;

import com.fasterxml.jackson.core.JsonLocation;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.log.Fields;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonSender;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.MessageHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.0.4.jar:org/eclipse/hono/client/impl/AbstractSender.class */
public abstract class AbstractSender extends AbstractHonoClient implements MessageSender {
    protected static final AtomicLong MESSAGE_COUNTER = new AtomicLong();
    protected final Logger log;
    protected final String tenantId;
    protected final String targetAddress;
    private Handler<Void> drainHandler;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSender(HonoConnection honoConnection, ProtonSender protonSender, String str, String str2) {
        super(honoConnection);
        this.log = LoggerFactory.getLogger(getClass());
        this.sender = (ProtonSender) Objects.requireNonNull(protonSender);
        this.tenantId = (String) Objects.requireNonNull(str);
        this.targetAddress = str2;
        if (protonSender.isOpen()) {
            this.offeredCapabilities = (List) Optional.ofNullable(protonSender.getRemoteOfferedCapabilities()).map(symbolArr -> {
                return Collections.unmodifiableList(Arrays.asList(symbolArr));
            }).orElse(Collections.emptyList());
        }
    }

    @Override // org.eclipse.hono.client.CreditBasedSender
    public final int getCredit() {
        if (this.sender == null) {
            return 0;
        }
        return this.sender.getCredit();
    }

    @Override // org.eclipse.hono.client.CreditBasedSender
    public final void sendQueueDrainHandler(Handler<Void> handler) {
        if (this.drainHandler != null) {
            throw new IllegalStateException("already waiting for replenishment with credit");
        }
        this.drainHandler = (Handler) Objects.requireNonNull(handler);
        this.sender.sendQueueDrainHandler(protonSender -> {
            this.log.trace("sender has received FLOW [credits: {}, queued:{}]", Integer.valueOf(protonSender.getCredit()), Integer.valueOf(protonSender.getQueued()));
            Handler<Void> handler2 = this.drainHandler;
            this.drainHandler = null;
            if (handler2 != null) {
                handler2.handle(null);
            }
        });
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final void close(Handler<AsyncResult<Void>> handler) {
        Objects.requireNonNull(handler);
        this.log.debug("closing sender ...");
        closeLinks(r4 -> {
            handler.handle(Future.succeededFuture());
        });
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final boolean isOpen() {
        return this.sender.isOpen();
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final Future<ProtonDelivery> send(Message message) {
        return send(message, (SpanContext) null);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final Future<ProtonDelivery> send(Message message, SpanContext spanContext) {
        Objects.requireNonNull(message);
        Span startSpan = startSpan(spanContext, message);
        Tags.MESSAGE_BUS_DESTINATION.set(startSpan, this.targetAddress);
        startSpan.setTag(MessageHelper.APP_PROPERTY_TENANT_ID, this.tenantId);
        startSpan.setTag(MessageHelper.APP_PROPERTY_DEVICE_ID, MessageHelper.getDeviceId(message));
        TracingHelper.injectSpanContext(this.connection.getTracer(), startSpan.context(), message);
        return this.connection.executeOrRunOnContext(future -> {
            if (!this.sender.sendQueueFull()) {
                sendMessage(message, startSpan).setHandler2(future);
                return;
            }
            ServerErrorException serverErrorException = new ServerErrorException(503, "no credit available");
            logError(startSpan, serverErrorException);
            startSpan.finish();
            future.fail(serverErrorException);
        });
    }

    protected abstract Future<ProtonDelivery> sendMessage(Message message, Span span);

    protected final Span startSpan(Message message) {
        return startSpan(null, message);
    }

    protected abstract Span startSpan(SpanContext spanContext, Message message);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract String getTo(String str);

    /* JADX INFO: Access modifiers changed from: protected */
    public Future<ProtonDelivery> sendMessageAndWaitForOutcome(Message message, Span span) {
        Objects.requireNonNull(message);
        Objects.requireNonNull(span);
        Future future = Future.future();
        String format = String.format("%s-%d", getClass().getSimpleName(), Long.valueOf(MESSAGE_COUNTER.getAndIncrement()));
        message.setMessageId(format);
        logMessageIdAndSenderInfo(span, format);
        Long valueOf = this.connection.getConfig().getSendMessageTimeout() > 0 ? Long.valueOf(this.connection.getVertx().setTimer(this.connection.getConfig().getSendMessageTimeout(), l -> {
            if (future.isComplete()) {
                return;
            }
            ServerErrorException serverErrorException = new ServerErrorException(503, "waiting for delivery update timed out after " + this.connection.getConfig().getSendMessageTimeout() + "ms");
            this.log.debug("waiting for delivery update timed out for message [message ID: {}] after {}ms", format, Long.valueOf(this.connection.getConfig().getSendMessageTimeout()));
            future.fail(serverErrorException);
        })) : null;
        this.sender.send(message, protonDelivery -> {
            if (valueOf != null) {
                this.connection.getVertx().cancelTimer(valueOf.longValue());
            }
            DeliveryState remoteState = protonDelivery.getRemoteState();
            if (future.isComplete()) {
                this.log.debug("ignoring received delivery update for message [message ID: {}]: waiting for the update has already timed out", format);
                return;
            }
            if (!protonDelivery.remotelySettled()) {
                this.log.debug("peer did not settle message [message ID: {}, remote state: {}], failing delivery", format, remoteState.getClass().getSimpleName());
                future.fail(new ServerErrorException(JsonLocation.MAX_CONTENT_SNIPPET, "peer did not settle message, failing delivery"));
                return;
            }
            logUpdatedDeliveryState(span, format, protonDelivery);
            if (Accepted.class.isInstance(remoteState)) {
                future.complete(protonDelivery);
                return;
            }
            Throwable th = null;
            if (Rejected.class.isInstance(remoteState)) {
                Rejected rejected = (Rejected) remoteState;
                th = rejected.getError() == null ? new ClientErrorException(400) : new ClientErrorException(400, rejected.getError().getDescription());
            } else if (Released.class.isInstance(remoteState)) {
                th = new ServerErrorException(503);
            } else if (Modified.class.isInstance(remoteState)) {
                th = ((Modified) protonDelivery.getRemoteState()).getUndeliverableHere().booleanValue() ? new ClientErrorException(404) : new ServerErrorException(503);
            }
            future.fail(th);
        });
        this.log.trace("sent message [ID: {}], remaining credit: {}, queued messages: {}", format, Integer.valueOf(this.sender.getCredit()), Integer.valueOf(this.sender.getQueued()));
        return future.map(protonDelivery2 -> {
            this.log.trace("message [ID: {}] accepted by peer", format);
            Tags.HTTP_STATUS.set(span, (Integer) 202);
            span.finish();
            return protonDelivery2;
        }).recover(th -> {
            TracingHelper.logError(span, th);
            Tags.HTTP_STATUS.set(span, Integer.valueOf(ServiceInvocationException.extractStatusCode(th)));
            span.finish();
            return Future.failedFuture(th);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void logMessageIdAndSenderInfo(Span span, String str) {
        HashMap hashMap = new HashMap(3);
        hashMap.put(TracingHelper.TAG_MESSAGE_ID.getKey(), str);
        hashMap.put(TracingHelper.TAG_CREDIT.getKey(), Integer.valueOf(this.sender.getCredit()));
        hashMap.put(TracingHelper.TAG_QOS.getKey(), this.sender.getQoS().toString());
        span.log(hashMap);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void logUpdatedDeliveryState(Span span, String str, ProtonDelivery protonDelivery) {
        Objects.requireNonNull(span);
        DeliveryState remoteState = protonDelivery.getRemoteState();
        if (Accepted.class.isInstance(remoteState)) {
            this.log.trace("message [message ID: {}] accepted by peer", str);
            span.log("message accepted by peer");
            Tags.HTTP_STATUS.set(span, (Integer) 202);
            return;
        }
        HashMap hashMap = new HashMap();
        if (Rejected.class.isInstance(remoteState)) {
            Rejected rejected = (Rejected) protonDelivery.getRemoteState();
            Tags.HTTP_STATUS.set(span, (Integer) 400);
            if (rejected.getError() == null) {
                this.log.debug("message [message ID: {}] rejected by peer", str);
                hashMap.put(Fields.MESSAGE, "message rejected by peer");
            } else {
                this.log.debug("message [message ID: {}] rejected by peer: {}, {}", str, rejected.getError().getCondition(), rejected.getError().getDescription());
                hashMap.put(Fields.MESSAGE, String.format("message rejected by peer: %s, %s", rejected.getError().getCondition(), rejected.getError().getDescription()));
            }
        } else if (Released.class.isInstance(remoteState)) {
            this.log.debug("message [message ID: {}] not accepted by peer, remote state: {}", str, remoteState.getClass().getSimpleName());
            Tags.HTTP_STATUS.set(span, (Integer) 503);
            hashMap.put(Fields.MESSAGE, "message not accepted by peer, remote state: " + remoteState);
        } else if (Modified.class.isInstance(remoteState)) {
            Modified modified = (Modified) protonDelivery.getRemoteState();
            this.log.debug("message [message ID: {}] not accepted by peer, remote state: {}", str, modified);
            Tags.HTTP_STATUS.set(span, Integer.valueOf(modified.getUndeliverableHere().booleanValue() ? 404 : 503));
            hashMap.put(Fields.MESSAGE, "message not accepted by peer, remote state: " + remoteState);
        }
        TracingHelper.logError(span, hashMap);
    }
}
