Skip to content

Commit

Permalink
Support config for mqtt publish qos max (#971)
Browse files Browse the repository at this point in the history
  • Loading branch information
jfallows committed Apr 22, 2024
1 parent 4a5f85d commit 225ab3c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.agrona.LangUtil;

import io.aklivity.zilla.runtime.binding.mqtt.internal.types.MqttQoS;
import io.aklivity.zilla.runtime.engine.Configuration;

public class MqttConfiguration extends Configuration
Expand All @@ -44,6 +45,7 @@ public class MqttConfiguration extends Configuration
public static final IntPropertyDef SESSION_EXPIRY_GRACE_PERIOD;
public static final PropertyDef<String> CLIENT_ID;
public static final PropertyDef<IntSupplier> SUBSCRIPTION_ID;
public static final PropertyDef<MqttQoS> PUBLISH_QOS_MAX;
public static final int GENERATED_SUBSCRIPTION_ID_MASK = 0x70;

static
Expand All @@ -66,6 +68,8 @@ public class MqttConfiguration extends Configuration
CLIENT_ID = config.property("client.id");
SUBSCRIPTION_ID = config.property(IntSupplier.class, "subscription.id",
MqttConfiguration::decodeIntSupplier, MqttConfiguration::defaultSubscriptionId);
PUBLISH_QOS_MAX = config.property(MqttQoS.class, "publish.qos.max",
MqttConfiguration::decodePublishQosMax, MqttQoS.EXACTLY_ONCE);
MQTT_CONFIG = config;
}

Expand Down Expand Up @@ -130,12 +134,22 @@ public String clientId()
return CLIENT_ID.get(this);
}


public IntSupplier subscriptionId()
{
return SUBSCRIPTION_ID.get(this);
}

public MqttQoS publishQosMax()
{
return PUBLISH_QOS_MAX.get(this);
}

private static MqttQoS decodePublishQosMax(
String value)
{
return MqttQoS.valueOf(value.toUpperCase());
}

private static IntSupplier decodeIntSupplier(
String fullyQualifiedMethodName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ public final class MqttServerFactory implements MqttStreamFactory
private final Map<MqttPacketType, MqttServerDecoder> decodersByPacketTypeV4;
private final Map<MqttPacketType, MqttServerDecoder> decodersByPacketTypeV5;
private final IntSupplier supplySubscriptionId;
private final MqttQoS publishQosMax;
private final EngineContext context;

private int maximumPacketSize = Integer.MAX_VALUE;
Expand Down Expand Up @@ -520,6 +521,7 @@ public MqttServerFactory(
this.validator = new MqttValidator();
this.utf8Decoder = StandardCharsets.UTF_8.newDecoder();
this.supplySubscriptionId = config.subscriptionId();
this.publishQosMax = config.publishQosMax();
final Optional<String16FW> clientId = Optional.ofNullable(config.clientId()).map(String16FW::new);
this.supplyClientId = clientId.isPresent() ? clientId::get : () -> new String16FW(UUID.randomUUID().toString());
this.decodePacketTypeByVersion = new Int2ObjectHashMap<>();
Expand Down Expand Up @@ -2938,7 +2940,7 @@ else if (this.authField.equals(MqttConnectProperty.PASSWORD))
.session(s -> s
.flags(connectFlags & (CLEAN_START_FLAG_MASK | WILL_FLAG_MASK))
.expiry(sessionExpiry)
.publishQosMax(MqttQoS.EXACTLY_ONCE.value())
.publishQosMax(publishQosMax.value())
.capabilities(capabilities)
.clientId(clientId));

Expand Down

0 comments on commit 225ab3c

Please sign in to comment.