Browse Source

Merge pull request #350 from sergiocampama/http1

Add gRPC Web support.
Tim Burks 7 years ago
parent
commit
ac1939e98b

+ 4 - 0
.gitignore

@@ -8,7 +8,11 @@ build
 /protoc-gen-swiftgrpc
 third_party/**
 /Echo
+/EchoNIO
 /test.out
 /echo.pid
 /SwiftGRPC.xcodeproj
 Package.resolved
+Examples/EchoWeb/dist
+Examples/EchoWeb/node_modules
+Examples/EchoWeb/package-lock.json

+ 181 - 0
Examples/EchoWeb/Generated/echo_grpc_web_pb.js

@@ -0,0 +1,181 @@
+/**
+ * @fileoverview gRPC-Web generated client stub for echo
+ * @enhanceable
+ * @public
+ */
+
+// GENERATED CODE -- DO NOT EDIT!
+
+
+
+const grpc = {};
+grpc.web = require('grpc-web');
+
+const proto = {};
+proto.echo = require('./echo_pb.js');
+
+/**
+ * @param {string} hostname
+ * @param {?Object} credentials
+ * @param {?Object} options
+ * @constructor
+ * @struct
+ * @final
+ */
+proto.echo.EchoClient =
+    function(hostname, credentials, options) {
+  if (!options) options = {};
+  options['format'] = 'text';
+
+  /**
+   * @private @const {!grpc.web.GrpcWebClientBase} The client
+   */
+  this.client_ = new grpc.web.GrpcWebClientBase(options);
+
+  /**
+   * @private @const {string} The hostname
+   */
+  this.hostname_ = hostname;
+
+  /**
+   * @private @const {?Object} The credentials to be used to connect
+   *    to the server
+   */
+  this.credentials_ = credentials;
+
+  /**
+   * @private @const {?Object} Options for the client
+   */
+  this.options_ = options;
+};
+
+
+/**
+ * @param {string} hostname
+ * @param {?Object} credentials
+ * @param {?Object} options
+ * @constructor
+ * @struct
+ * @final
+ */
+proto.echo.EchoPromiseClient =
+    function(hostname, credentials, options) {
+  if (!options) options = {};
+  options['format'] = 'text';
+
+  /**
+   * @private @const {!proto.echo.EchoClient} The delegate callback based client
+   */
+  this.delegateClient_ = new proto.echo.EchoClient(
+      hostname, credentials, options);
+
+};
+
+
+/**
+ * @const
+ * @type {!grpc.web.AbstractClientBase.MethodInfo<
+ *   !proto.echo.EchoRequest,
+ *   !proto.echo.EchoResponse>}
+ */
+const methodInfo_Echo_Get = new grpc.web.AbstractClientBase.MethodInfo(
+  proto.echo.EchoResponse,
+  /** @param {!proto.echo.EchoRequest} request */
+  function(request) {
+    return request.serializeBinary();
+  },
+  proto.echo.EchoResponse.deserializeBinary
+);
+
+
+/**
+ * @param {!proto.echo.EchoRequest} request The
+ *     request proto
+ * @param {!Object<string, string>} metadata User defined
+ *     call metadata
+ * @param {function(?grpc.web.Error, ?proto.echo.EchoResponse)}
+ *     callback The callback function(error, response)
+ * @return {!grpc.web.ClientReadableStream<!proto.echo.EchoResponse>|undefined}
+ *     The XHR Node Readable Stream
+ */
+proto.echo.EchoClient.prototype.get =
+    function(request, metadata, callback) {
+  return this.client_.rpcCall(this.hostname_ +
+      '/echo.Echo/Get',
+      request,
+      metadata,
+      methodInfo_Echo_Get,
+      callback);
+};
+
+
+/**
+ * @param {!proto.echo.EchoRequest} request The
+ *     request proto
+ * @param {!Object<string, string>} metadata User defined
+ *     call metadata
+ * @return {!Promise<!proto.echo.EchoResponse>}
+ *     The XHR Node Readable Stream
+ */
+proto.echo.EchoPromiseClient.prototype.get =
+    function(request, metadata) {
+  return new Promise((resolve, reject) => {
+    this.delegateClient_.get(
+      request, metadata, (error, response) => {
+        error ? reject(error) : resolve(response);
+      });
+  });
+};
+
+
+/**
+ * @const
+ * @type {!grpc.web.AbstractClientBase.MethodInfo<
+ *   !proto.echo.EchoRequest,
+ *   !proto.echo.EchoResponse>}
+ */
+const methodInfo_Echo_Expand = new grpc.web.AbstractClientBase.MethodInfo(
+  proto.echo.EchoResponse,
+  /** @param {!proto.echo.EchoRequest} request */
+  function(request) {
+    return request.serializeBinary();
+  },
+  proto.echo.EchoResponse.deserializeBinary
+);
+
+
+/**
+ * @param {!proto.echo.EchoRequest} request The request proto
+ * @param {!Object<string, string>} metadata User defined
+ *     call metadata
+ * @return {!grpc.web.ClientReadableStream<!proto.echo.EchoResponse>}
+ *     The XHR Node Readable Stream
+ */
+proto.echo.EchoClient.prototype.expand =
+    function(request, metadata) {
+  return this.client_.serverStreaming(this.hostname_ +
+      '/echo.Echo/Expand',
+      request,
+      metadata,
+      methodInfo_Echo_Expand);
+};
+
+
+/**
+ * @param {!proto.echo.EchoRequest} request The request proto
+ * @param {!Object<string, string>} metadata User defined
+ *     call metadata
+ * @return {!grpc.web.ClientReadableStream<!proto.echo.EchoResponse>}
+ *     The XHR Node Readable Stream
+ */
+proto.echo.EchoPromiseClient.prototype.expand =
+    function(request, metadata) {
+  return this.delegateClient_.client_.serverStreaming(this.delegateClient_.hostname_ +
+      '/echo.Echo/Expand',
+      request,
+      metadata,
+      methodInfo_Echo_Expand);
+};
+
+
+module.exports = proto.echo;

+ 300 - 0
Examples/EchoWeb/Generated/echo_pb.js

@@ -0,0 +1,300 @@
+/**
+ * @fileoverview
+ * @enhanceable
+ * @suppress {messageConventions} JS Compiler reports an error if a variable or
+ *     field starts with 'MSG_' and isn't a translatable message.
+ * @public
+ */
+// GENERATED CODE -- DO NOT EDIT!
+
+var jspb = require('google-protobuf');
+var goog = jspb;
+var global = Function('return this')();
+
+goog.exportSymbol('proto.echo.EchoRequest', null, global);
+goog.exportSymbol('proto.echo.EchoResponse', null, global);
+
+/**
+ * Generated by JsPbCodeGenerator.
+ * @param {Array=} opt_data Optional initial data array, typically from a
+ * server response, or constructed directly in Javascript. The array is used
+ * in place and becomes part of the constructed object. It is not cloned.
+ * If no data is provided, the constructed object will be empty, but still
+ * valid.
+ * @extends {jspb.Message}
+ * @constructor
+ */
+proto.echo.EchoRequest = function(opt_data) {
+  jspb.Message.initialize(this, opt_data, 0, -1, null, null);
+};
+goog.inherits(proto.echo.EchoRequest, jspb.Message);
+if (goog.DEBUG && !COMPILED) {
+  proto.echo.EchoRequest.displayName = 'proto.echo.EchoRequest';
+}
+
+
+if (jspb.Message.GENERATE_TO_OBJECT) {
+/**
+ * Creates an object representation of this proto suitable for use in Soy templates.
+ * Field names that are reserved in JavaScript and will be renamed to pb_name.
+ * To access a reserved field use, foo.pb_<name>, eg, foo.pb_default.
+ * For the list of reserved names please see:
+ *     com.google.apps.jspb.JsClassTemplate.JS_RESERVED_WORDS.
+ * @param {boolean=} opt_includeInstance Whether to include the JSPB instance
+ *     for transitional soy proto support: http://goto/soy-param-migration
+ * @return {!Object}
+ */
+proto.echo.EchoRequest.prototype.toObject = function(opt_includeInstance) {
+  return proto.echo.EchoRequest.toObject(opt_includeInstance, this);
+};
+
+
+/**
+ * Static version of the {@see toObject} method.
+ * @param {boolean|undefined} includeInstance Whether to include the JSPB
+ *     instance for transitional soy proto support:
+ *     http://goto/soy-param-migration
+ * @param {!proto.echo.EchoRequest} msg The msg instance to transform.
+ * @return {!Object}
+ * @suppress {unusedLocalVariables} f is only used for nested messages
+ */
+proto.echo.EchoRequest.toObject = function(includeInstance, msg) {
+  var f, obj = {
+    text: jspb.Message.getFieldWithDefault(msg, 1, "")
+  };
+
+  if (includeInstance) {
+    obj.$jspbMessageInstance = msg;
+  }
+  return obj;
+};
+}
+
+
+/**
+ * Deserializes binary data (in protobuf wire format).
+ * @param {jspb.ByteSource} bytes The bytes to deserialize.
+ * @return {!proto.echo.EchoRequest}
+ */
+proto.echo.EchoRequest.deserializeBinary = function(bytes) {
+  var reader = new jspb.BinaryReader(bytes);
+  var msg = new proto.echo.EchoRequest;
+  return proto.echo.EchoRequest.deserializeBinaryFromReader(msg, reader);
+};
+
+
+/**
+ * Deserializes binary data (in protobuf wire format) from the
+ * given reader into the given message object.
+ * @param {!proto.echo.EchoRequest} msg The message object to deserialize into.
+ * @param {!jspb.BinaryReader} reader The BinaryReader to use.
+ * @return {!proto.echo.EchoRequest}
+ */
+proto.echo.EchoRequest.deserializeBinaryFromReader = function(msg, reader) {
+  while (reader.nextField()) {
+    if (reader.isEndGroup()) {
+      break;
+    }
+    var field = reader.getFieldNumber();
+    switch (field) {
+    case 1:
+      var value = /** @type {string} */ (reader.readString());
+      msg.setText(value);
+      break;
+    default:
+      reader.skipField();
+      break;
+    }
+  }
+  return msg;
+};
+
+
+/**
+ * Serializes the message to binary data (in protobuf wire format).
+ * @return {!Uint8Array}
+ */
+proto.echo.EchoRequest.prototype.serializeBinary = function() {
+  var writer = new jspb.BinaryWriter();
+  proto.echo.EchoRequest.serializeBinaryToWriter(this, writer);
+  return writer.getResultBuffer();
+};
+
+
+/**
+ * Serializes the given message to binary data (in protobuf wire
+ * format), writing to the given BinaryWriter.
+ * @param {!proto.echo.EchoRequest} message
+ * @param {!jspb.BinaryWriter} writer
+ * @suppress {unusedLocalVariables} f is only used for nested messages
+ */
+proto.echo.EchoRequest.serializeBinaryToWriter = function(message, writer) {
+  var f = undefined;
+  f = message.getText();
+  if (f.length > 0) {
+    writer.writeString(
+      1,
+      f
+    );
+  }
+};
+
+
+/**
+ * optional string text = 1;
+ * @return {string}
+ */
+proto.echo.EchoRequest.prototype.getText = function() {
+  return /** @type {string} */ (jspb.Message.getFieldWithDefault(this, 1, ""));
+};
+
+
+/** @param {string} value */
+proto.echo.EchoRequest.prototype.setText = function(value) {
+  jspb.Message.setProto3StringField(this, 1, value);
+};
+
+
+
+/**
+ * Generated by JsPbCodeGenerator.
+ * @param {Array=} opt_data Optional initial data array, typically from a
+ * server response, or constructed directly in Javascript. The array is used
+ * in place and becomes part of the constructed object. It is not cloned.
+ * If no data is provided, the constructed object will be empty, but still
+ * valid.
+ * @extends {jspb.Message}
+ * @constructor
+ */
+proto.echo.EchoResponse = function(opt_data) {
+  jspb.Message.initialize(this, opt_data, 0, -1, null, null);
+};
+goog.inherits(proto.echo.EchoResponse, jspb.Message);
+if (goog.DEBUG && !COMPILED) {
+  proto.echo.EchoResponse.displayName = 'proto.echo.EchoResponse';
+}
+
+
+if (jspb.Message.GENERATE_TO_OBJECT) {
+/**
+ * Creates an object representation of this proto suitable for use in Soy templates.
+ * Field names that are reserved in JavaScript and will be renamed to pb_name.
+ * To access a reserved field use, foo.pb_<name>, eg, foo.pb_default.
+ * For the list of reserved names please see:
+ *     com.google.apps.jspb.JsClassTemplate.JS_RESERVED_WORDS.
+ * @param {boolean=} opt_includeInstance Whether to include the JSPB instance
+ *     for transitional soy proto support: http://goto/soy-param-migration
+ * @return {!Object}
+ */
+proto.echo.EchoResponse.prototype.toObject = function(opt_includeInstance) {
+  return proto.echo.EchoResponse.toObject(opt_includeInstance, this);
+};
+
+
+/**
+ * Static version of the {@see toObject} method.
+ * @param {boolean|undefined} includeInstance Whether to include the JSPB
+ *     instance for transitional soy proto support:
+ *     http://goto/soy-param-migration
+ * @param {!proto.echo.EchoResponse} msg The msg instance to transform.
+ * @return {!Object}
+ * @suppress {unusedLocalVariables} f is only used for nested messages
+ */
+proto.echo.EchoResponse.toObject = function(includeInstance, msg) {
+  var f, obj = {
+    text: jspb.Message.getFieldWithDefault(msg, 1, "")
+  };
+
+  if (includeInstance) {
+    obj.$jspbMessageInstance = msg;
+  }
+  return obj;
+};
+}
+
+
+/**
+ * Deserializes binary data (in protobuf wire format).
+ * @param {jspb.ByteSource} bytes The bytes to deserialize.
+ * @return {!proto.echo.EchoResponse}
+ */
+proto.echo.EchoResponse.deserializeBinary = function(bytes) {
+  var reader = new jspb.BinaryReader(bytes);
+  var msg = new proto.echo.EchoResponse;
+  return proto.echo.EchoResponse.deserializeBinaryFromReader(msg, reader);
+};
+
+
+/**
+ * Deserializes binary data (in protobuf wire format) from the
+ * given reader into the given message object.
+ * @param {!proto.echo.EchoResponse} msg The message object to deserialize into.
+ * @param {!jspb.BinaryReader} reader The BinaryReader to use.
+ * @return {!proto.echo.EchoResponse}
+ */
+proto.echo.EchoResponse.deserializeBinaryFromReader = function(msg, reader) {
+  while (reader.nextField()) {
+    if (reader.isEndGroup()) {
+      break;
+    }
+    var field = reader.getFieldNumber();
+    switch (field) {
+    case 1:
+      var value = /** @type {string} */ (reader.readString());
+      msg.setText(value);
+      break;
+    default:
+      reader.skipField();
+      break;
+    }
+  }
+  return msg;
+};
+
+
+/**
+ * Serializes the message to binary data (in protobuf wire format).
+ * @return {!Uint8Array}
+ */
+proto.echo.EchoResponse.prototype.serializeBinary = function() {
+  var writer = new jspb.BinaryWriter();
+  proto.echo.EchoResponse.serializeBinaryToWriter(this, writer);
+  return writer.getResultBuffer();
+};
+
+
+/**
+ * Serializes the given message to binary data (in protobuf wire
+ * format), writing to the given BinaryWriter.
+ * @param {!proto.echo.EchoResponse} message
+ * @param {!jspb.BinaryWriter} writer
+ * @suppress {unusedLocalVariables} f is only used for nested messages
+ */
+proto.echo.EchoResponse.serializeBinaryToWriter = function(message, writer) {
+  var f = undefined;
+  f = message.getText();
+  if (f.length > 0) {
+    writer.writeString(
+      1,
+      f
+    );
+  }
+};
+
+
+/**
+ * optional string text = 1;
+ * @return {string}
+ */
+proto.echo.EchoResponse.prototype.getText = function() {
+  return /** @type {string} */ (jspb.Message.getFieldWithDefault(this, 1, ""));
+};
+
+
+/** @param {string} value */
+proto.echo.EchoResponse.prototype.setText = function(value) {
+  jspb.Message.setProto3StringField(this, 1, value);
+};
+
+
+goog.object.extend(exports, proto.echo);

+ 9 - 0
Examples/EchoWeb/Makefile

@@ -0,0 +1,9 @@
+
+all:
+	npm install
+	npx webpack client.js
+
+clean:
+	rm -rf Packages googleapis .build
+	rm -f Package.pins Echo google.json
+	rm -rf Package.resolved Echo.xcodeproj Echo

+ 17 - 0
Examples/EchoWeb/README.md

@@ -0,0 +1,17 @@
+# Echo gRPC-Web Sample App
+
+The Echo gRPC-Web is a node project that creates a website that
+connects to a Swift gRPC NIO server to display messages. To build
+it, just run `make` inside this directory, and then open the
+`index.html` file in a web browser. Remember to start the Echo
+service by executing `swift run EchoNIO serve` before opening
+`index.html` in the browser.
+
+The proto files were generated by invoking `protoc` with the
+`protoc-gen-grpc-web` plugin as described
+[here](https://github.com/grpc/grpc-web/tree/master/net/grpc/gateway/examples/helloworld#generate-protobuf-messages-and-client-service-stub).
+
+## Dependencies
+
+You'll need to install `npm` in order to compile the Javascript
+code.

+ 33 - 0
Examples/EchoWeb/client.js

@@ -0,0 +1,33 @@
+const {EchoRequest, EchoResponse} = require('./Generated/echo_pb.js');
+const {EchoClient} = require('./Generated/echo_grpc_web_pb.js');
+
+var client = new EchoClient('http://localhost:8080');
+
+function sendMessage(message) {
+  var request = new EchoRequest();
+  request.setText(message);
+
+  client.get(request, {}, (err, response) => {
+    var responseLabel = document.getElementById("response_label")
+    if (err) {
+      responseLabel.innerText = "ERROR: Could not connect to the server."
+    } else {
+      responseLabel.innerText = "Server reply: " + response.getText()
+    }
+  });
+
+  var expandStream = client.expand(request);
+  expandStream.on('data', function(response) {
+    console.log(response.getText());
+  });
+  expandStream.on('end', function(end) {
+    console.log("Expand Stream Ended");
+  });
+
+}
+
+window.addEventListener("DOMContentLoaded", function() {
+  document.getElementById("message_button").addEventListener("click", function() {
+    sendMessage(document.getElementById("input_field").value);
+  });
+}, false);

+ 17 - 0
Examples/EchoWeb/index.html

@@ -0,0 +1,17 @@
+<!DOCTYPE html>
+<html lang="en">
+<head>
+  <meta charset="UTF-8">
+  <title>Echo gRPC-Web Example</title>
+  <script src="./dist/main.js"></script>
+</head>
+<body>
+  <div>
+    <input type="text" id="input_field" value="Test me!"/>
+    <button type="button" id="message_button">Send Message!</button>
+  </div>
+  <div>
+    <p id="response_label"></p>
+  </div>
+</body>
+</html>

+ 13 - 0
Examples/EchoWeb/package.json

@@ -0,0 +1,13 @@
+{
+  "name": "echo-grpc-web-example",
+  "version": "0.1.0",
+  "description": "Echo gRPC-Web Example",
+  "devDependencies": {
+    "@grpc/proto-loader": "^0.3.0",
+    "google-protobuf": "^3.6.1",
+    "grpc": "^1.15.0",
+    "grpc-web": "^1.0.0",
+    "webpack": "^4.16.5",
+    "webpack-cli": "^3.1.0"
+  }
+}

+ 13 - 2
Makefile

@@ -27,10 +27,10 @@ project-carthage:
 	@ruby fix-project-settings.rb SwiftGRPC-Carthage.xcodeproj || echo "xcodeproj ('sudo gem install xcodeproj') is required in order to generate the Carthage-compatible project!"
 	@ruby patch-carthage-project.rb SwiftGRPC-Carthage.xcodeproj || echo "xcodeproj ('sudo gem install xcodeproj') is required in order to generate the Carthage-compatible project!"
 
-test:	all
+test: all
 	swift test $(CFLAGS)
 
-test-echo:	all
+test-echo: all
 	cp .build/debug/Echo .
 	./Echo serve & /bin/echo $$! > echo.pid
 	./Echo get | tee test.out
@@ -40,6 +40,17 @@ test-echo:	all
 	kill -9 `cat echo.pid`
 	diff -u test.out Sources/Examples/Echo/test.gold
 
+test-echo-nio: all
+	cp .build/debug/EchoNIO .
+	cp .build/debug/Echo .
+	./EchoNIO serve & /bin/echo $$! > echo.pid
+	./Echo get | tee test.out
+	./Echo expand | tee -a test.out
+	./Echo collect | tee -a test.out
+	./Echo update | tee -a test.out
+	kill -9 `cat echo.pid`
+	diff -u test.out Sources/Examples/Echo/test.gold
+
 test-plugin:
 	swift build $(CFLAGS) --product protoc-gen-swiftgrpc
 	protoc Sources/Examples/Echo/echo.proto --proto_path=Sources/Examples/Echo --plugin=.build/debug/protoc-gen-swift --plugin=.build/debug/protoc-gen-swiftgrpc --swiftgrpc_out=/tmp --swiftgrpc_opt=TestStubs=true

+ 7 - 1
Package.swift

@@ -23,7 +23,7 @@ var packageDependencies: [Package.Dependency] = [
   .package(url: "https://github.com/apple/swift-nio-zlib-support.git", .upToNextMinor(from: "1.0.0")),
   .package(url: "https://github.com/apple/swift-nio.git", .upToNextMinor(from: "1.12.0")),
   .package(url: "https://github.com/apple/swift-nio-nghttp2-support.git", .upToNextMinor(from: "1.0.0")),
-  .package(url: "https://github.com/apple/swift-nio-http2.git", .revision("dd9339e6310ad8537a271f3ff60a4f3976ca8e4d"))
+  .package(url: "https://github.com/apple/swift-nio-http2.git", .upToNextMinor(from: "0.2.1"))
 ]
 
 var cGRPCDependencies: [Target.Dependency] = []
@@ -75,6 +75,12 @@ let package = Package(
               "SwiftProtobuf",
               "Commander"],
             path: "Sources/Examples/Echo"),
+    .target(name: "EchoNIO",
+            dependencies: [
+              "SwiftGRPCNIO",
+              "SwiftProtobuf",
+              "Commander"],
+            path: "Sources/Examples/EchoNIO"),
     .target(name: "Simple",
             dependencies: ["SwiftGRPC", "Commander"],
             path: "Sources/Examples/Simple"),

+ 71 - 0
Sources/Examples/EchoNIO/EchoProviderNIO.swift

@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+import NIO
+import SwiftGRPCNIO
+
+class EchoProviderNIO: Echo_EchoProvider_NIO {
+  func get(request: Echo_EchoRequest, context: StatusOnlyCallContext) -> EventLoopFuture<Echo_EchoResponse> {
+    var response = Echo_EchoResponse()
+    response.text = "Swift echo get: " + request.text
+    return context.eventLoop.newSucceededFuture(result: response)
+  }
+
+  func expand(request: Echo_EchoRequest, context: StreamingResponseCallContext<Echo_EchoResponse>) -> EventLoopFuture<GRPCStatus> {
+    var endOfSendOperationQueue = context.eventLoop.newSucceededFuture(result: ())
+    let parts = request.text.components(separatedBy: " ")
+    for (i, part) in parts.enumerated() {
+      var response = Echo_EchoResponse()
+      response.text = "Swift echo expand (\(i)): \(part)"
+      endOfSendOperationQueue = endOfSendOperationQueue.then { context.sendResponse(response) }
+    }
+    return endOfSendOperationQueue.map { GRPCStatus.ok }
+  }
+
+  func collect(context: UnaryResponseCallContext<Echo_EchoResponse>) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
+    var parts: [String] = []
+    return context.eventLoop.newSucceededFuture(result: { event in
+      switch event {
+      case .message(let message):
+        parts.append(message.text)
+
+      case .end:
+        var response = Echo_EchoResponse()
+        response.text = "Swift echo collect: " + parts.joined(separator: " ")
+        context.responsePromise.succeed(result: response)
+      }
+    })
+  }
+
+  func update(context: StreamingResponseCallContext<Echo_EchoResponse>) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
+    var endOfSendOperationQueue = context.eventLoop.newSucceededFuture(result: ())
+    var count = 0
+    return context.eventLoop.newSucceededFuture(result: { event in
+      switch event {
+      case .message(let message):
+        var response = Echo_EchoResponse()
+        response.text = "Swift echo update (\(count)): \(message.text)"
+        endOfSendOperationQueue = endOfSendOperationQueue.then { context.sendResponse(response) }
+        count += 1
+
+      case .end:
+        endOfSendOperationQueue
+          .map { GRPCStatus.ok }
+          .cascade(promise: context.statusPromise)
+      }
+    })
+  }
+}

+ 1 - 0
Sources/Examples/EchoNIO/Generated/echo.pb.swift

@@ -0,0 +1 @@
+../../../../Tests/SwiftGRPCNIOTests/echo.pb.swift

+ 1 - 0
Sources/Examples/EchoNIO/Generated/echo_nio.grpc.swift

@@ -0,0 +1 @@
+../../../../Tests/SwiftGRPCNIOTests/echo_nio.grpc.swift

+ 11 - 0
Sources/Examples/EchoNIO/Makefile

@@ -0,0 +1,11 @@
+all:
+	swift build -c debug --product EchoNIO
+	cp .build/debug/EchoNIO .
+
+project:
+	swift package generate-xcodeproj
+
+clean :
+	rm -rf Packages googleapis .build
+	rm -f Package.pins Echo google.json
+	rm -rf Package.resolved EchoNIO.xcodeproj EchoNIO

+ 6 - 0
Sources/Examples/EchoNIO/README.md

@@ -0,0 +1,6 @@
+# EchoNIO, a gRPC NIO Sample App
+
+This directory contains a simple echo server that demonstrates
+all four gRPC API styles (Unary, Server Streaming, Client
+Streaming, and Bidirectional Streaming) using the NIO based
+Swift gRPC implementation.

+ 7 - 0
Sources/Examples/EchoNIO/RUNME

@@ -0,0 +1,7 @@
+#!/bin/sh
+#
+# Use this to run the swift-proto generator
+#
+protoc echo.proto \
+    --swift_out=Generated \
+    --swiftgrpc_out=Client=false,Server=true,NIO=true:Generated

+ 1 - 0
Sources/Examples/EchoNIO/echo.proto

@@ -0,0 +1 @@
+../Echo/echo.proto

+ 51 - 0
Sources/Examples/EchoNIO/main.swift

@@ -0,0 +1,51 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Commander
+import Dispatch
+import Foundation
+import NIO
+import SwiftGRPCNIO
+
+// Common flags and options
+func addressOption(_ address: String) -> Option<String> {
+  return Option("address", default: address, description: "address of server")
+}
+
+let portOption = Option("port",
+                        default: "8080",
+                        description: "port of server")
+
+Group {
+  $0.command("serve",
+             addressOption("0.0.0.0"),
+             portOption,
+             description: "Run an echo server.") { address, port in
+    let sem = DispatchSemaphore(value: 0)
+    let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
+
+    print("starting insecure server")
+    _ = try! GRPCServer.start(hostname: address,
+                          port: Int(port)!,
+                          eventLoopGroup: eventLoopGroup,
+                          serviceProviders: [EchoProviderNIO()])
+      .wait()
+
+    // This blocks to keep the main thread from finishing while the server runs,
+    // but the server never exits. Kill the process to stop it.
+    _ = sem.wait()
+  }
+
+}.run()

+ 8 - 4
Sources/SwiftGRPCNIO/GRPCChannelHandler.swift

@@ -37,7 +37,7 @@ public final class GRPCChannelHandler {
 extension GRPCChannelHandler: ChannelInboundHandler {
   public typealias InboundIn = RawGRPCServerRequestPart
   public typealias OutboundOut = RawGRPCServerResponsePart
-  
+
   public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
     let requestPart = self.unwrapInboundIn(data)
     switch requestPart {
@@ -61,9 +61,13 @@ extension GRPCChannelHandler: ChannelInboundHandler {
         assert(handlerWasRemoved)
 
         ctx.pipeline.add(handler: callHandler, after: codec).whenComplete {
-          var responseHeaders = HTTPHeaders()
-          responseHeaders.add(name: "content-type", value: "application/grpc")
-          ctx.write(self.wrapOutboundOut(.headers(responseHeaders)), promise: nil)
+          // Send the .headers event back to begin the headers flushing for the response.
+          // At this point, which headers should be returned is not known, as the content type is
+          // processed in HTTP1ToRawGRPCServerCodec. At the same time the HTTP1ToRawGRPCServerCodec
+          // handler doesn't have the data to determine whether headers should be returned, as it is
+          // this handler that checks whether the stub for the requested Service/Method is implemented.
+          // This likely signals that the architecture for these handlers could be improved.
+          ctx.write(self.wrapOutboundOut(.headers(HTTPHeaders())), promise: nil)
         }
       }
 

+ 5 - 10
Sources/SwiftGRPCNIO/GRPCServer.swift

@@ -22,16 +22,11 @@ public final class GRPCServer {
 
       // Set the handlers that are applied to the accepted Channels
       .childChannelInitializer { channel in
-        //! FIXME: Add an option for gRPC-via-HTTP1 (pPRC).
-        return channel.pipeline.add(handler: HTTP2Parser(mode: .server)).then {
-          let multiplexer = HTTP2StreamMultiplexer { (channel, streamID) -> EventLoopFuture<Void> in
-            return channel.pipeline.add(handler: HTTP2ToHTTP1ServerCodec(streamID: streamID))
-              .then { channel.pipeline.add(handler: HTTP1ToRawGRPCServerCodec()) }
-              .then { channel.pipeline.add(handler: GRPCChannelHandler(servicesByName: servicesByName)) }
-          }
-
-          return channel.pipeline.add(handler: multiplexer)
-        }
+        return channel.pipeline.add(handler: HTTPProtocolSwitcher {
+          channel -> EventLoopFuture<Void> in
+          return channel.pipeline.add(handler: HTTP1ToRawGRPCServerCodec())
+            .then { channel.pipeline.add(handler: GRPCChannelHandler(servicesByName: servicesByName)) }
+        })
       }
 
       // Enable TCP_NODELAY and SO_REUSEADDR for the accepted Channels

+ 123 - 19
Sources/SwiftGRPCNIO/HTTP1ToRawGRPCServerCodec.swift

@@ -1,6 +1,7 @@
 import Foundation
 import NIO
 import NIOHTTP1
+import NIOFoundationCompat
 
 /// Incoming gRPC package with an unknown message type (represented by a byte buffer).
 public enum RawGRPCServerRequestPart {
@@ -27,6 +28,16 @@ public enum RawGRPCServerResponsePart {
 ///
 /// The translation from HTTP2 to HTTP1 is done by `HTTP2ToHTTP1ServerCodec`.
 public final class HTTP1ToRawGRPCServerCodec {
+  /// Expected content types for incoming requests.
+  private enum ContentType: String {
+    /// Binary encoded gRPC request.
+    case binary = "application/grpc"
+    /// Base64 encoded gRPC-Web request.
+    case text = "application/grpc-web-text"
+    /// Binary encoded gRPC-Web request.
+    case web = "application/grpc-web"
+  }
+
   private enum State {
     case expectingHeaders
     case expectingCompressedFlag
@@ -43,13 +54,28 @@ public final class HTTP1ToRawGRPCServerCodec {
 
   private var state = State.expectingHeaders
 
-  private var buffer: NIO.ByteBuffer?
+  private var contentType: ContentType?
+
+  // The following buffers use force unwrapping explicitly. With optionals, developers
+  // are encouraged to unwrap them using guard-else statements. These don't work cleanly
+  // with structs, since the guard-else would create a new copy of the struct, which
+  // would then have to be re-assigned into the class variable for the changes to take effect.
+  // By force unwrapping, we avoid those reassignments, and the code is a bit cleaner.
+
+  // Buffer to store binary encoded protos as they're being received.
+  private var binaryRequestBuffer: NIO.ByteBuffer!
+
+  // Buffers to store text encoded protos. Only used when content-type is application/grpc-web-text.
+  // TODO(kaipi): Extract all gRPC Web processing logic into an independent handler only added on
+  // the HTTP1.1 pipeline, as it's starting to get in the way of readability.
+  private var requestTextBuffer: NIO.ByteBuffer!
+  private var responseTextBuffer: NIO.ByteBuffer!
 }
 
 extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler {
   public typealias InboundIn = HTTPServerRequestPart
   public typealias InboundOut = RawGRPCServerRequestPart
-  
+
   public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
     switch self.unwrapInboundIn(data) {
     case .head(let requestHead):
@@ -57,15 +83,41 @@ extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler {
         else { preconditionFailure("received headers while in state \(state)") }
 
       state = .expectingCompressedFlag
-      buffer = ctx.channel.allocator.buffer(capacity: 5)
-
+      binaryRequestBuffer = ctx.channel.allocator.buffer(capacity: 5)
+      if let contentTypeHeader = requestHead.headers["content-type"].first {
+        contentType = ContentType(rawValue: contentTypeHeader)
+      } else {
+        // If the Content-Type is not present, assume the request is binary encoded gRPC.
+        contentType = .binary
+      }
+      if contentType == .text {
+        requestTextBuffer = ctx.channel.allocator.buffer(capacity: 0)
+      }
       ctx.fireChannelRead(self.wrapInboundOut(.head(requestHead)))
 
     case .body(var body):
-      guard var buffer = buffer
-        else { preconditionFailure("buffer not initialized") }
+      precondition(binaryRequestBuffer != nil, "buffer not initialized")
       assert(state.expectingBody, "received body while in state \(state)")
-      buffer.write(buffer: &body)
+
+      // If the contentType is text, then decode the incoming bytes as base64 encoded, and append
+      // it to the binary buffer. If the request is chunked, this section will process the text
+      // in the biggest chunk that is multiple of 4, leaving the unread bytes in the textBuffer
+      // where it will expect a new incoming chunk.
+      if contentType == .text {
+        precondition(requestTextBuffer != nil)
+        requestTextBuffer.write(buffer: &body)
+        // Read in chunks of 4 bytes as base64 encoded strings will always be multiples of 4.
+        let readyBytes = requestTextBuffer.readableBytes - (requestTextBuffer.readableBytes % 4)
+        guard let base64Encoded = requestTextBuffer.readString(length:readyBytes),
+            let decodedData = Data(base64Encoded: base64Encoded) else {
+          //! FIXME: Improve error handling when the message couldn't be decoded as base64.
+          ctx.close(mode: .all, promise: nil)
+          return
+        }
+        binaryRequestBuffer.write(bytes: decodedData)
+      } else {
+        binaryRequestBuffer.write(buffer: &body)
+      }
 
       // Iterate over all available incoming data, trying to read length-delimited messages.
       // Each message has the following format:
@@ -76,22 +128,22 @@ extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler {
         switch state {
         case .expectingHeaders: preconditionFailure("unexpected state \(state)")
         case .expectingCompressedFlag:
-          guard let compressionFlag: Int8 = buffer.readInteger() else { break requestProcessing }
+          guard let compressionFlag: Int8 = binaryRequestBuffer.readInteger() else { break requestProcessing }
           //! FIXME: Avoid crashing here and instead drop the connection.
           precondition(compressionFlag == 0, "unexpected compression flag \(compressionFlag); compression is not supported and we did not indicate support for it")
           state = .expectingMessageLength
 
         case .expectingMessageLength:
-          guard let messageLength: UInt32 = buffer.readInteger() else { break requestProcessing }
+          guard let messageLength: UInt32 = binaryRequestBuffer.readInteger() else { break requestProcessing }
           state = .receivedMessageLength(messageLength)
 
         case .receivedMessageLength(let messageLength):
-          guard let messageBytes = buffer.readBytes(length: numericCast(messageLength)) else { break }
+          guard let messageBytes = binaryRequestBuffer.readBytes(length: numericCast(messageLength)) else { break }
 
           //! FIXME: Use a slice of this buffer instead of copying to a new buffer.
-          var responseBuffer = ctx.channel.allocator.buffer(capacity: messageBytes.count)
-          responseBuffer.write(bytes: messageBytes)
-          ctx.fireChannelRead(self.wrapInboundOut(.message(responseBuffer)))
+          var messageBuffer = ctx.channel.allocator.buffer(capacity: messageBytes.count)
+          messageBuffer.write(bytes: messageBytes)
+          ctx.fireChannelRead(self.wrapInboundOut(.message(messageBuffer)))
           //! FIXME: Call buffer.discardReadBytes() here?
           //! ALTERNATIVE: Check if the buffer has no further data right now, then clear it.
 
@@ -113,25 +165,77 @@ extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler {
 extension HTTP1ToRawGRPCServerCodec: ChannelOutboundHandler {
   public typealias OutboundIn = RawGRPCServerResponsePart
   public typealias OutboundOut = HTTPServerResponsePart
-  
+
   public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
     let responsePart = self.unwrapOutboundIn(data)
     switch responsePart {
-    case .headers(let headers):
-      //! FIXME: Should return a different version if we want to support pPRC.
-      ctx.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: .init(major: 2, minor: 0), status: .ok, headers: headers))), promise: promise)
+    case .headers:
+      var headers = HTTPHeaders()
+      var version = HTTPVersion(major: 2, minor: 0)
+      if let contentType = contentType {
+        headers.add(name: "content-type", value: contentType.rawValue)
+        if contentType != .binary {
+          version = .init(major: 1, minor: 1)
+        }
+      }
+
+      if contentType == .text {
+        responseTextBuffer = ctx.channel.allocator.buffer(capacity: 0)
+      }
+
+      ctx.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: version, status: .ok, headers: headers))), promise: promise)
     case .message(var messageBytes):
       // Write out a length-delimited message payload. See `channelRead` fpor the corresponding format.
       var responseBuffer = ctx.channel.allocator.buffer(capacity: messageBytes.readableBytes + 5)
       responseBuffer.write(integer: Int8(0))  // Compression flag: no compression
       responseBuffer.write(integer: UInt32(messageBytes.readableBytes))
       responseBuffer.write(buffer: &messageBytes)
-      ctx.write(self.wrapOutboundOut(.body(.byteBuffer(responseBuffer))), promise: promise)
+
+      if contentType == .text {
+        precondition(responseTextBuffer != nil)
+        // Store the response into an independent buffer. We can't return the message directly as
+        // it needs to be aggregated with all the responses plus the trailers, in order to have
+        // the base64 response properly encoded in a single byte stream.
+        responseTextBuffer!.write(buffer: &responseBuffer)
+        // Since we stored the written data, mark the write promise as successful so that the
+        // ServerStreaming provider continues sending the data.
+        promise?.succeed(result: Void())
+      } else {
+        ctx.write(self.wrapOutboundOut(.body(.byteBuffer(responseBuffer))), promise: promise)
+      }
+
     case .status(let status):
       var trailers = status.trailingMetadata
       trailers.add(name: "grpc-status", value: String(describing: status.code.rawValue))
       trailers.add(name: "grpc-message", value: status.message)
-      ctx.write(self.wrapOutboundOut(.end(trailers)), promise: promise)
+
+      if contentType == .text {
+        precondition(responseTextBuffer != nil)
+
+        // Encode the trailers into the response byte stream as a length delimited message, as per
+        // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
+        let textTrailers = trailers.map { name, value in "\(name): \(value)" }.joined(separator: "\r\n")
+        responseTextBuffer.write(integer: UInt8(0x80))
+        responseTextBuffer.write(integer: UInt32(textTrailers.utf8.count))
+        responseTextBuffer.write(string: textTrailers)
+
+        // TODO: Binary responses that are non multiples of 3 will end = or == when encoded in
+        // base64. Investigate whether this might have any effect on the transport mechanism and
+        // client decoding. Initial results say that they are inocuous, but we might have to keep
+        // an eye on this in case something trips up.
+        if let binaryData = responseTextBuffer.readData(length: responseTextBuffer.readableBytes) {
+          let encodedData = binaryData.base64EncodedString()
+          responseTextBuffer.clear()
+          responseTextBuffer.reserveCapacity(encodedData.utf8.count)
+          responseTextBuffer.write(string: encodedData)
+        }
+        // After collecting all response for gRPC Web connections, send one final aggregated
+        // response.
+        ctx.write(self.wrapOutboundOut(.body(.byteBuffer(responseTextBuffer))), promise: promise)
+        ctx.write(self.wrapOutboundOut(.end(nil)), promise: promise)
+      } else {
+        ctx.write(self.wrapOutboundOut(.end(trailers)), promise: promise)
+      }
     }
   }
 }

+ 97 - 0
Sources/SwiftGRPCNIO/HTTPProtocolSwitcher.swift

@@ -0,0 +1,97 @@
+import Foundation
+import NIO
+import NIOHTTP1
+import NIOHTTP2
+
+/// Channel handler that creates different processing pipelines depending on whether
+/// the incoming request is HTTP 1 or 2.
+public class HTTPProtocolSwitcher {
+  private let handlersInitializer: ((Channel) -> EventLoopFuture<Void>)
+
+  public init(handlersInitializer: (@escaping (Channel) -> EventLoopFuture<Void>)) {
+    self.handlersInitializer = handlersInitializer
+  }
+}
+
+extension HTTPProtocolSwitcher: ChannelInboundHandler {
+  public typealias InboundIn = ByteBuffer
+  public typealias InboundOut = ByteBuffer
+
+  enum HTTPProtocolVersionError: Error {
+    /// Raised when it wasn't possible to detect HTTP Protocol version.
+    case invalidHTTPProtocolVersion
+
+    var localizedDescription: String {
+      switch self {
+      case .invalidHTTPProtocolVersion:
+        return "Could not identify HTTP Protocol Version"
+      }
+    }
+  }
+
+  /// HTTP Protocol Version type
+  enum HTTPProtocolVersion {
+    case http1
+    case http2
+  }
+
+  public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
+    // Detect the HTTP protocol version for the incoming request, or error out if it
+    // couldn't be detected.
+    var inBuffer = unwrapInboundIn(data)
+    guard let initialData = inBuffer.readString(length: inBuffer.readableBytes),
+          let preamble = initialData.split(separator: "\r\n",
+                                           maxSplits: 1,
+                                           omittingEmptySubsequences: true).first,
+          let version = protocolVersion(String(preamble)) else {
+
+      ctx.fireErrorCaught(HTTPProtocolVersionError.invalidHTTPProtocolVersion)
+      return
+    }
+
+    // Depending on whether it is HTTP1 or HTTP2, created different processing pipelines.
+    // Inbound handlers in handlersInitializer should expect HTTPServerRequestPart objects
+    // and outbound handlers should return HTTPServerResponsePart objects.
+    switch version {
+    case .http1:
+      // Upgrade connections are not handled since gRPC connections already arrive in HTTP2,
+      // while gRPC-Web does not support HTTP2 at all, so there are no compelling use cases
+      // to support this.
+      _ = ctx.pipeline.configureHTTPServerPipeline(withErrorHandling: true)
+        .then { ctx.pipeline.add(handler: WebCORSHandler()) }
+        .then { (Void) -> EventLoopFuture<Void> in self.handlersInitializer(ctx.channel) }
+    case .http2:
+      _ = ctx.pipeline.add(handler: HTTP2Parser(mode: .server))
+        .then { () -> EventLoopFuture<Void> in
+          let multiplexer = HTTP2StreamMultiplexer { (channel, streamID) -> EventLoopFuture<Void> in
+            return channel.pipeline.add(handler: HTTP2ToHTTP1ServerCodec(streamID: streamID))
+              .then { (Void) -> EventLoopFuture<Void> in self.handlersInitializer(channel) }
+          }
+          return ctx.pipeline.add(handler: multiplexer)
+        }
+    }
+
+    ctx.fireChannelRead(data)
+    _ = ctx.pipeline.remove(ctx: ctx)
+  }
+
+  /// Peek into the first line of the packet to check which HTTP version is being used.
+  private func protocolVersion(_ preamble: String) -> HTTPProtocolVersion? {
+    let range = NSRange(location: 0, length: preamble.utf16.count)
+    let regex = try! NSRegularExpression(pattern: "^.*HTTP/(\\d)\\.\\d$")
+    let result = regex.firstMatch(in: preamble, options: [], range: range)!
+
+    let versionRange = result.range(at: 1)
+    let start = String.UTF16Index(encodedOffset: versionRange.location)
+    let end = String.UTF16Index(encodedOffset: versionRange.location + versionRange.length)
+
+    switch String(preamble.utf16[start..<end])! {
+    case "1":
+      return .http1
+    case "2":
+      return .http2
+    default:
+      return nil
+    }
+  }
+}

+ 73 - 0
Sources/SwiftGRPCNIO/WebCORSHandler.swift

@@ -0,0 +1,73 @@
+import NIO
+import NIOHTTP1
+
+/// Handler that manages the CORS protocol for requests incoming from the browser.
+public class WebCORSHandler {
+  var requestMethod: HTTPMethod?
+}
+
+extension WebCORSHandler: ChannelInboundHandler {
+  public typealias InboundIn = HTTPServerRequestPart
+  public typealias OutboundOut = HTTPServerResponsePart
+
+  public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
+    // If the request is OPTIONS, the request is not propagated further.
+    switch self.unwrapInboundIn(data) {
+    case .head(let requestHead):
+      requestMethod = requestHead.method
+      if requestMethod == .OPTIONS {
+        var headers = HTTPHeaders()
+        headers.add(name: "Access-Control-Allow-Origin", value: "*")
+        headers.add(name: "Access-Control-Allow-Methods", value: "POST")
+        headers.add(name: "Access-Control-Allow-Headers",
+                    value: "content-type,x-grpc-web,x-user-agent")
+        headers.add(name: "Access-Control-Max-Age", value: "86400")
+        ctx.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: requestHead.version,
+                                                              status: .ok,
+                                                              headers: headers))),
+                  promise: nil)
+        return
+      }
+    case .body:
+      if requestMethod == .OPTIONS {
+        // OPTIONS requests do not have a body, but still handle this case to be
+        // cautious.
+        return
+      }
+
+    case .end:
+      if requestMethod == .OPTIONS {
+        ctx.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
+        requestMethod = nil
+        return
+      }
+    }
+    // The OPTIONS request should be fully handled at this point.
+    ctx.fireChannelRead(data)
+  }
+}
+
+extension WebCORSHandler: ChannelOutboundHandler {
+  public typealias OutboundIn = HTTPServerResponsePart
+
+  public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
+    let responsePart = self.unwrapOutboundIn(data)
+    switch responsePart {
+    case .head(let responseHead):
+      var headers = responseHead.headers
+      // CORS requires all requests to have an Allow-Origin header.
+      headers.add(name: "Access-Control-Allow-Origin", value: "*")
+      //! FIXME: Check whether we can let browsers keep connections alive. It's not possible
+      // now as the channel has a state that can't be reused since the pipeline is modified to
+      // inject the gRPC call handler.
+      headers.add(name: "Connection", value: "close")
+
+      ctx.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: responseHead.version,
+                                                            status: responseHead.status,
+                                                            headers: headers))),
+                promise: promise)
+    default:
+      ctx.write(data, promise: promise)
+    }
+  }
+}

+ 1 - 1
Tests/SwiftGRPCNIOTests/NIOServerTests.swift

@@ -66,7 +66,7 @@ final class EchoProvider_NIO: Echo_EchoProvider_NIO {
         response.text = "Swift echo update (\(count)): \(message.text)"
         endOfSendOperationQueue = endOfSendOperationQueue.then { context.sendResponse(response) }
         count += 1
-        
+
       case .end:
         endOfSendOperationQueue
           .map { GRPCStatus.ok }

+ 164 - 0
Tests/SwiftGRPCNIOTests/NIOServerWebTests.swift

@@ -0,0 +1,164 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Foundation
+import NIO
+@testable import SwiftGRPCNIO
+import XCTest
+
+// Only test Unary and ServerStreaming, as ClientStreaming is not
+// supported in HTTP1.
+// TODO: Add tests for application/grpc-web as well.
+class NIOServerWebTests: NIOServerTestCase {
+  static var allTests: [(String, (NIOServerWebTests) -> () throws -> Void)] {
+    return [
+      ("testUnary", testUnary),
+      ("testUnaryLotsOfRequests", testUnaryLotsOfRequests),
+      ("testServerStreaming", testServerStreaming),
+    ]
+  }
+
+  var eventLoopGroup: MultiThreadedEventLoopGroup!
+  var server: GRPCServer!
+
+  override func setUp() {
+    super.setUp()
+
+    // This is how a GRPC server would actually be set up.
+    eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
+    server = try! GRPCServer.start(
+      hostname: "localhost", port: 5050, eventLoopGroup: eventLoopGroup, serviceProviders: [EchoProvider_NIO()])
+      .wait()
+  }
+
+  override func tearDown() {
+    XCTAssertNoThrow(try server.close().wait())
+
+    XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully())
+    eventLoopGroup = nil
+
+    super.tearDown()
+  }
+
+  private func gRPCEncodedEchoRequest(_ text: String) -> Data {
+    var request = Echo_EchoRequest()
+    request.text = text
+    var data = try! request.serializedData()
+    // Add the gRPC prefix with the compression byte and the 4 length bytes.
+    for i in 0..<4 {
+      data.insert(UInt8((data.count >> (i * 8)) & 0xFF), at: 0)
+    }
+    data.insert(UInt8(0), at: 0)
+    return data
+  }
+
+  private func gRPCWebOKTrailers() -> Data {
+    var data = "grpc-status: 0\r\ngrpc-message: OK".data(using: .utf8)!
+    // Add the gRPC prefix with the compression byte and the 4 length bytes.
+    for i in 0..<4 {
+      data.insert(UInt8((data.count >> (i * 8)) & 0xFF), at: 0)
+    }
+    data.insert(UInt8(0x80), at: 0)
+    return data
+  }
+
+  private func sendOverHTTP1(rpcMethod: String, message: String, handler: @escaping (Data?, Error?) -> Void) {
+    let serverURL = URL(string: "http://localhost:5050/echo.Echo/\(rpcMethod)")!
+    var request = URLRequest(url: serverURL)
+    request.httpMethod = "POST"
+    request.setValue("application/grpc-web-text", forHTTPHeaderField: "content-type")
+
+    request.httpBody = gRPCEncodedEchoRequest(message).base64EncodedData()
+
+    let sem = DispatchSemaphore(value: 0)
+    URLSession.shared.dataTask(with: request) { (data, response, error) in
+      handler(data, error)
+      sem.signal()
+    }.resume()
+    _ = sem.wait()
+  }
+}
+
+extension NIOServerWebTests {
+  func testUnary() {
+    let message = "hello, world!"
+    let expectedData = gRPCEncodedEchoRequest("Swift echo get: \(message)") + gRPCWebOKTrailers()
+    let expectedResponse = expectedData.base64EncodedString()
+
+    let completionHandlerExpectation = expectation(description: "completion handler called")
+
+    sendOverHTTP1(rpcMethod: "Get", message: message) { data, error in
+      XCTAssertNil(error)
+      if let data = data {
+        XCTAssertEqual(String(data: data, encoding: .utf8), expectedResponse)
+        completionHandlerExpectation.fulfill()
+      }
+    }
+
+    waitForExpectations(timeout: defaultTimeout)
+  }
+
+  func testUnaryLotsOfRequests() {
+    // Sending that many requests at once can sometimes trip things up, it seems.
+    let clockStart = clock()
+    let numberOfRequests = 2_000
+    let completionHandlerExpectation = expectation(description: "completion handler called")
+#if os(macOS)
+    // Linux version of Swift doesn't have this API yet.
+    // Implemented in https://github.com/apple/swift-corelibs-xctest/pull/228 but not yet
+    // released.
+    completionHandlerExpectation.expectedFulfillmentCount = numberOfRequests
+#endif
+    for i in 0..<numberOfRequests {
+      let message = "foo \(i)"
+      let expectedData = gRPCEncodedEchoRequest("Swift echo get: \(message)") + gRPCWebOKTrailers()
+      let expectedResponse = expectedData.base64EncodedString()
+      sendOverHTTP1(rpcMethod: "Get", message: message) { data, error in
+        XCTAssertNil(error)
+        if let data = data {
+          XCTAssertEqual(String(data: data, encoding: .utf8), expectedResponse)
+          completionHandlerExpectation.fulfill()
+        }
+      }
+    }
+    waitForExpectations(timeout: 10)
+    print("total time for \(numberOfRequests) requests: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))")
+  }
+
+  func testServerStreaming() {
+    let message = "foo bar baz"
+
+
+    var expectedData = Data()
+    var index = 0
+    message.split(separator: " ").forEach { (component) in
+      expectedData.append(gRPCEncodedEchoRequest("Swift echo expand (\(index)): \(component)"))
+      index += 1
+    }
+    expectedData.append(gRPCWebOKTrailers())
+    let expectedResponse = expectedData.base64EncodedString()
+    let completionHandlerExpectation = expectation(description: "completion handler called")
+
+    sendOverHTTP1(rpcMethod: "Expand", message: message) { data, error in
+      XCTAssertNil(error)
+      if let data = data {
+        XCTAssertEqual(String(data: data, encoding: .utf8), expectedResponse)
+        completionHandlerExpectation.fulfill()
+      }
+    }
+
+    waitForExpectations(timeout: defaultTimeout)
+  }
+}