Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ws over h2 #220

Draft
wants to merge 12 commits into
base: develop
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public enum Http2Setting
MAX_CONCURRENT_STREAMS(3),
INITIAL_WINDOW_SIZE(4),
MAX_FRAME_SIZE(5),
MAX_HEADER_LIST_SIZE(6);
MAX_HEADER_LIST_SIZE(6),
ENABLE_CONNECT_PROTOCOL(8);

private final int id;

Expand All @@ -49,6 +50,7 @@ static Http2Setting get(
case 4: return INITIAL_WINDOW_SIZE;
case 5: return MAX_FRAME_SIZE;
case 6: return MAX_HEADER_LIST_SIZE;
case 8: return ENABLE_CONNECT_PROTOCOL;
default: return UNKNOWN;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.aklivity.zilla.runtime.binding.http.internal.codec;

import static io.aklivity.zilla.runtime.binding.http.internal.codec.Http2FrameType.SETTINGS;
import static io.aklivity.zilla.runtime.binding.http.internal.codec.Http2Setting.ENABLE_CONNECT_PROTOCOL;
import static io.aklivity.zilla.runtime.binding.http.internal.codec.Http2Setting.ENABLE_PUSH;
import static io.aklivity.zilla.runtime.binding.http.internal.codec.Http2Setting.HEADER_TABLE_SIZE;
import static io.aklivity.zilla.runtime.binding.http.internal.codec.Http2Setting.INITIAL_WINDOW_SIZE;
Expand Down Expand Up @@ -239,6 +240,13 @@ public Builder maxHeaderListSize(long size)
return this;
}

public Builder enableConnectProtocol()
jfallows marked this conversation as resolved.
Show resolved Hide resolved
{
addSetting(x -> x.setting(ENABLE_CONNECT_PROTOCOL.id(), 1L));
return this;
}


private Builder addSetting(Consumer<Http2SettingFW.Builder> mutator)
{
settingsRW.item(mutator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,15 @@ public final class HttpServerFactory implements HttpStreamFactory
private static final String HEADER_NAME_ACCESS_CONTROL_EXPOSE_HEADERS = "access-control-expose-headers";
private static final String HEADER_NAME_METHOD = ":method";
private static final String HEADER_NAME_ORIGIN = "origin";
private static final String HEADER_NAME_PROTOCOL = ":protocol";
private static final String HEADER_NAME_SCHEME = ":scheme";
private static final String HEADER_NAME_AUTHORITY = ":authority";
private static final String HEADER_NAME_CONTENT_TYPE = "content-type";
private static final String HEADER_NAME_CONTENT_LENGTH = "content-length";

private static final String METHOD_NAME_OPTIONS = "OPTIONS";
private static final String METHOD_NAME_POST = "POST";
private static final String METHOD_NAME_CONNECT = "CONNECT";

private static final String CHALLENGE_RESPONSE_METHOD = METHOD_NAME_POST;
private static final String CHALLENGE_RESPONSE_CONTENT_TYPE = "application/x-challenge-response";
Expand All @@ -234,6 +236,7 @@ public final class HttpServerFactory implements HttpStreamFactory
private static final String8FW HEADER_CONTENT_LENGTH = new String8FW("content-length");
private static final String8FW HEADER_METHOD = new String8FW(":method");
private static final String8FW HEADER_PATH = new String8FW(":path");
private static final String8FW HEADER_PROTOCOL = new String8FW(":protocol");
private static final String8FW HEADER_SCHEME = new String8FW(":scheme");
private static final String8FW HEADER_SERVER = new String8FW("server");
private static final String8FW HEADER_STATUS = new String8FW(":status");
Expand Down Expand Up @@ -5313,6 +5316,7 @@ private void doEncodeSettings(
.maxConcurrentStreams(initialSettings.maxConcurrentStreams)
.initialWindowSize(initialSettings.initialWindowSize)
.maxHeaderListSize(initialSettings.maxHeaderListSize)
.enableConnectProtocol()
.build();

doNetworkReservedData(traceId, authorization, 0L, http2Settings);
Expand Down Expand Up @@ -6206,6 +6210,7 @@ private final class Http2HeadersDecoder
private int method;
private int scheme;
private int path;
private int protocol;

Http2ErrorCode connectionError;
Http2ErrorCode streamError;
Expand Down Expand Up @@ -6244,6 +6249,21 @@ void decodeHeaders(
// a CONNECT request (Section 8.3). An HTTP request that omits
// mandatory pseudo-header fields is malformed
if (!error() && (method != 1 || scheme != 1 || path != 1))
{
streamError = Http2ErrorCode.PROTOCOL_ERROR;
return;
}

boolean isConnect = METHOD_NAME_CONNECT.equals(headers.get(HEADER_NAME_METHOD));

// A CONNECT request MAY include a ":protocol" pseudo-header, and
// a ":protocol" pseudo-header must not appear in a non-CONNECT request.
// TODO: Add test
if (!isConnect && protocol > 0 || isConnect && protocol > 1 ||
// On requests that contain the :protocol pseudo-header field, the :scheme
// and :path pseudo-header fields of the target URI MUST also be included
// (RFC8441, Section 4).
protocol == 1 && (scheme != 1 || path != 1 || headers.get(HEADER_NAME_PROTOCOL).isBlank()))
{
streamError = Http2ErrorCode.PROTOCOL_ERROR;
}
Expand Down Expand Up @@ -6344,6 +6364,7 @@ private void validatePseudoHeaders(
return;
}
// request pseudo-header fields MUST be one of :authority, :method, :path, :scheme,
// and may be :protocol if the :method pseudo-header is CONNECT (RFC8441)
int index = context.index(name);
switch (index)
{
Expand All @@ -6366,8 +6387,15 @@ private void validatePseudoHeaders(
scheme++;
break;
default:
streamError = Http2ErrorCode.PROTOCOL_ERROR;
return;
if (HEADER_PROTOCOL.value().compareTo(name) == 0)
{
protocol++;
break;
}
else
{
streamError = Http2ErrorCode.PROTOCOL_ERROR;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are just tracking the presence and count of the pseudo-headers, so they can be checked in decodeHeaders. So we likely need to add a count for protocol and include in the check.

Note we are validating the pseudo-headers themselves, not the values.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I misread this part of the spec in rfc8441:

The :protocol pseudo-header field MUST be included in the CONNECT request, and it MUST have a value of websocket to initiate a WebSocket connection on an HTTP/2 stream

I now think it means that if we have a CONNECT request, before we create a websocket connection we must ensure that the request also contains the :protocol pseudo-header and it must have the value websocket.

Maybe we don't need to do any validation in the http2 binding in relation to a connect request.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Until we find a better place for it, I have added a check to allow a :protocol pseudo-header in a CONNECT request which is otherwise not allowed by rfc7540. This seems to make sense to be in the http2 binding because the binding is always advertising the ENABLE_CONNECT_PROTOCOL setting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, just make sure you are doing the simple count here, and the enforcement in decodeHeaders, aligned with approach for other checks.

}
}
else
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.http.internal.streams.rfc8441.client;

import static io.aklivity.zilla.runtime.binding.http.internal.HttpConfigurationTest.HTTP_CONCURRENT_STREAMS_NAME;
import static io.aklivity.zilla.runtime.binding.http.internal.HttpConfigurationTest.HTTP_STREAM_INITIAL_WINDOW_NAME;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.rules.RuleChain.outerRule;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.kaazing.k3po.junit.annotation.Specification;
import org.kaazing.k3po.junit.rules.K3poRule;

import io.aklivity.zilla.runtime.engine.EngineConfiguration;
import io.aklivity.zilla.runtime.engine.test.EngineRule;
import io.aklivity.zilla.runtime.engine.test.annotation.Configuration;
import io.aklivity.zilla.runtime.engine.test.annotation.Configure;

public class ConnectIT
{
private final K3poRule k3po = new K3poRule()
.addScriptRoot("app", "io/aklivity/zilla/specs/binding/http/streams/application/rfc8441/connect/")
.addScriptRoot("net", "io/aklivity/zilla/specs/binding/http/streams/network/rfc8441/connect/");

private final TestRule timeout = new DisableOnDebug(new Timeout(10, SECONDS));

private final EngineRule engine = new EngineRule()
.directory("target/zilla-itests")
.countersBufferCapacity(8192)
.configurationRoot("io/aklivity/zilla/specs/binding/http/config/v2")
.external("net0")
.configure(EngineConfiguration.ENGINE_DRAIN_ON_CLOSE, false)
.clean();

@Rule
public final TestRule chain = outerRule(engine).around(k3po).around(timeout);

@Test
@Configuration("client.yaml")
@Specification({
"${app}/client",
"${net}/server"})
@Configure(name = HTTP_CONCURRENT_STREAMS_NAME, value = "100")
@Configure(name = HTTP_STREAM_INITIAL_WINDOW_NAME, value = "65535")
public void shouldConnect() throws Exception
{
k3po.finish();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.http.internal.streams.rfc8441.server;

import static io.aklivity.zilla.runtime.binding.http.internal.HttpConfiguration.HTTP_STREAM_INITIAL_WINDOW;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.rules.RuleChain.outerRule;

import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.kaazing.k3po.junit.annotation.Specification;
import org.kaazing.k3po.junit.rules.K3poRule;

import io.aklivity.zilla.runtime.engine.EngineConfiguration;
import io.aklivity.zilla.runtime.engine.test.EngineRule;
import io.aklivity.zilla.runtime.engine.test.annotation.Configuration;

public class ConnectIT
{
private final K3poRule k3po = new K3poRule()
.addScriptRoot("app", "io/aklivity/zilla/specs/binding/http/streams/application/rfc8441")
.addScriptRoot("net", "io/aklivity/zilla/specs/binding/http/streams/network/rfc8441");

private final TestRule timeout = new DisableOnDebug(new Timeout(10, SECONDS));

private final EngineRule engine = new EngineRule()
.directory("target/zilla-itests")
.countersBufferCapacity(8192)
.configurationRoot("io/aklivity/zilla/specs/binding/http/config/v2")
.external("app0")
.configure(EngineConfiguration.ENGINE_DRAIN_ON_CLOSE, false)
.configure(HTTP_STREAM_INITIAL_WINDOW, 65535)
.clean();

@Rule
public final TestRule chain = outerRule(engine).around(k3po).around(timeout);

@Test
@Configuration("server.yaml")
@Specification({
"${app}/connect/server",
"${net}/connect/client"})
public void shouldConnect() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/invalid.header/client"})
@Ignore("Investigate test failure")
public void shouldResetStreamForInvalidPseudoHeader() throws Exception
{
k3po.finish();
}
}
Loading