2
0
Эх сурвалжийг харах

First attempt at writing client-streaming (collect) and server-streaming (expand) API clients

Tim Burks 9 жил өмнө
parent
commit
f9cdb3a098

+ 2 - 0
Examples/Echo/Go/go/src/client/client.go

@@ -144,6 +144,7 @@ func call_expand(c pb.EchoClient, message string) {
 	if err != nil {
 		panic(err)
 	}
+	waitc := make(chan struct{})
 	for {
 		in, err := stream.Recv()
 		if err == io.EOF {
@@ -156,4 +157,5 @@ func call_expand(c pb.EchoClient, message string) {
 		}
 		log.Printf("Received: %s", in.Text)
 	}
+	<-waitc
 }

+ 9 - 5
Examples/Echo/Go/go/src/server/server.go

@@ -21,6 +21,7 @@ import (
 	"log"
 	"net"
 	"strings"
+	"time"
 
 	pb "echo"
 	"golang.org/x/net/context"
@@ -38,7 +39,7 @@ var echoServer EchoServer
 // [START get]
 func (s *EchoServer) Get(ctx context.Context, r *pb.EchoRequest) (*pb.EchoResponse, error) {
 	response := &pb.EchoResponse{}
-	response.Text = "Go nonstreaming echo " + r.Text
+	response.Text = "Go echo get: " + r.Text
 	fmt.Printf("Get received: %s\n", r.Text)
 	return response, nil
 }
@@ -46,6 +47,7 @@ func (s *EchoServer) Get(ctx context.Context, r *pb.EchoRequest) (*pb.EchoRespon
 // [END get]
 
 func (s *EchoServer) Update(stream pb.Echo_UpdateServer) error {
+	count := 0
 	for {
 		in, err := stream.Recv()
 		if err == io.EOF {
@@ -56,7 +58,8 @@ func (s *EchoServer) Update(stream pb.Echo_UpdateServer) error {
 		}
 
 		response := &pb.EchoResponse{}
-		response.Text = "Go streaming echo " + in.Text
+		response.Text = fmt.Sprintf("Go echo update (%d): %s", count, in.Text)
+		count++
 
 		fmt.Printf("Update received: %s\n", in.Text)
 
@@ -80,7 +83,7 @@ func (s *EchoServer) Collect(stream pb.Echo_CollectServer) error {
 		parts = append(parts, in.Text)
 	}
 	response := &pb.EchoResponse{}
-	response.Text = strings.Join(parts, " ")
+	response.Text = fmt.Sprintf("Go echo collect: %s", strings.Join(parts, " "))
 	if err := stream.SendAndClose(response); err != nil {
 		return err
 	}
@@ -91,12 +94,13 @@ func (s *EchoServer) Expand(request *pb.EchoRequest, stream pb.Echo_ExpandServer
 	fmt.Printf("Expand received: %s\n", request.Text)
 	parts := strings.Split(request.Text, " ")
 
-	for _, part := range parts {
+	for i, part := range parts {
 		response := &pb.EchoResponse{}
-		response.Text = part
+		response.Text = fmt.Sprintf("Go echo expand (%d): %s", i, part)
 		if err := stream.Send(response); err != nil {
 			return err
 		}
+		time.Sleep(1*time.Second)
 	}
 
 	return nil

BIN
Examples/Echo/Go/message.png


+ 29 - 12
Examples/Echo/Swift/Echo/Base.lproj/MainMenu.xib

@@ -1,8 +1,8 @@
 <?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<document type="com.apple.InterfaceBuilder3.Cocoa.XIB" version="3.0" toolsVersion="11198.2" systemVersion="15G31" targetRuntime="MacOSX.Cocoa" propertyAccessControl="none" useAutolayout="YES" customObjectInstantitationMethod="direct">
+<document type="com.apple.InterfaceBuilder3.Cocoa.XIB" version="3.0" toolsVersion="11201" systemVersion="15G1004" targetRuntime="MacOSX.Cocoa" propertyAccessControl="none" useAutolayout="YES" customObjectInstantitationMethod="direct">
     <dependencies>
         <deployment identifier="macosx"/>
-        <plugIn identifier="com.apple.InterfaceBuilder.CocoaPlugin" version="11198.2"/>
+        <plugIn identifier="com.apple.InterfaceBuilder.CocoaPlugin" version="11201"/>
     </dependencies>
     <objects>
         <customObject id="-2" userLabel="File's Owner" customClass="NSApplication">
@@ -105,7 +105,7 @@
             <windowStyleMask key="styleMask" titled="YES" closable="YES" miniaturizable="YES"/>
             <windowPositionMask key="initialPositionMask" leftStrut="YES" rightStrut="YES" topStrut="YES" bottomStrut="YES"/>
             <rect key="contentRect" x="335" y="390" width="400" height="179"/>
-            <rect key="screenRect" x="0.0" y="0.0" width="1440" height="877"/>
+            <rect key="screenRect" x="0.0" y="0.0" width="1200" height="1897"/>
             <view key="contentView" wantsLayer="YES" id="EiT-Mj-1SZ">
                 <rect key="frame" x="0.0" y="0.0" width="400" height="179"/>
                 <autoresizingMask key="autoresizingMask"/>
@@ -132,9 +132,9 @@
                         </textFieldCell>
                     </textField>
                     <textField verticalHuggingPriority="750" fixedFrame="YES" translatesAutoresizingMaskIntoConstraints="NO" id="jnU-RF-9F0">
-                        <rect key="frame" x="20" y="61" width="360" height="22"/>
+                        <rect key="frame" x="20" y="61" width="305" height="22"/>
                         <autoresizingMask key="autoresizingMask" flexibleMaxX="YES" flexibleMinY="YES"/>
-                        <textFieldCell key="cell" scrollable="YES" lineBreakMode="clipping" selectable="YES" editable="YES" sendsActionOnEndEditing="YES" state="on" borderStyle="bezel" title="localhost:8081" placeholderString="Server Address" drawsBackground="YES" id="mDa-qc-62R">
+                        <textFieldCell key="cell" scrollable="YES" lineBreakMode="clipping" selectable="YES" editable="YES" sendsActionOnEndEditing="YES" state="on" borderStyle="bezel" title="localhost:8080" placeholderString="Server Address" drawsBackground="YES" id="mDa-qc-62R">
                             <font key="font" metaFont="system"/>
                             <color key="textColor" name="textColor" catalog="System" colorSpace="catalog"/>
                             <color key="backgroundColor" name="textBackgroundColor" catalog="System" colorSpace="catalog"/>
@@ -143,19 +143,35 @@
                             <action selector="addressReturnPressedWithSender:" target="8aG-cq-jvr" id="LPA-g7-vIb"/>
                         </connections>
                     </textField>
-                    <button fixedFrame="YES" translatesAutoresizingMaskIntoConstraints="NO" id="5WD-rH-7rC">
-                        <rect key="frame" x="18" y="18" width="84" height="25"/>
+                    <segmentedControl verticalHuggingPriority="750" fixedFrame="YES" translatesAutoresizingMaskIntoConstraints="NO" id="Xjz-PB-FD9">
+                        <rect key="frame" x="18" y="16" width="276" height="24"/>
                         <autoresizingMask key="autoresizingMask" flexibleMaxX="YES" flexibleMinY="YES"/>
-                        <buttonCell key="cell" type="check" title="Streaming" bezelStyle="regularSquare" imagePosition="left" inset="2" id="A9P-Au-0Zu">
-                            <behavior key="behavior" changeContents="YES" doesNotDimImage="YES" lightByContents="YES"/>
+                        <segmentedCell key="cell" borderStyle="border" alignment="left" style="rounded" trackingMode="selectOne" id="55X-rv-QL3">
+                            <font key="font" metaFont="system"/>
+                            <segments>
+                                <segment label="Get" width="68" selected="YES"/>
+                                <segment label="Expand" width="67" tag="1"/>
+                                <segment label="Collect" width="67"/>
+                                <segment label="Update"/>
+                            </segments>
+                        </segmentedCell>
+                        <connections>
+                            <action selector="buttonValueChangedWithSender:" target="8aG-cq-jvr" id="6MK-R9-4JD"/>
+                        </connections>
+                    </segmentedControl>
+                    <button verticalHuggingPriority="750" fixedFrame="YES" translatesAutoresizingMaskIntoConstraints="NO" id="lTw-do-2ef">
+                        <rect key="frame" x="308" y="11" width="75" height="32"/>
+                        <autoresizingMask key="autoresizingMask" flexibleMaxX="YES" flexibleMinY="YES"/>
+                        <buttonCell key="cell" type="push" title="Close" bezelStyle="rounded" alignment="center" borderStyle="border" imageScaling="proportionallyDown" inset="2" id="d5c-kK-Cnr">
+                            <behavior key="behavior" pushIn="YES" lightByBackground="YES" lightByGray="YES"/>
                             <font key="font" metaFont="system"/>
                         </buttonCell>
                         <connections>
-                            <action selector="buttonValueChangedWithSender:" target="8aG-cq-jvr" id="hJq-8b-Uoe"/>
+                            <action selector="closeButtonPressedWithSender:" target="8aG-cq-jvr" id="CSt-4Z-6Pf"/>
                         </connections>
                     </button>
                     <button fixedFrame="YES" translatesAutoresizingMaskIntoConstraints="NO" id="1Ls-ri-Jup">
-                        <rect key="frame" x="170" y="21" width="46" height="18"/>
+                        <rect key="frame" x="336" y="63" width="46" height="18"/>
                         <autoresizingMask key="autoresizingMask" flexibleMaxX="YES" flexibleMinY="YES"/>
                         <buttonCell key="cell" type="check" title="TLS" bezelStyle="regularSquare" imagePosition="left" inset="2" id="2hc-58-S2L">
                             <behavior key="behavior" changeContents="YES" doesNotDimImage="YES" lightByContents="YES"/>
@@ -173,9 +189,10 @@
             <connections>
                 <outlet property="TLSButton" destination="1Ls-ri-Jup" id="jan-Vv-OVl"/>
                 <outlet property="addressField" destination="jnU-RF-9F0" id="8vy-f1-cgl"/>
+                <outlet property="callSelectButton" destination="Xjz-PB-FD9" id="hac-kZ-WIN"/>
+                <outlet property="closeButton" destination="lTw-do-2ef" id="gLi-Sn-tTw"/>
                 <outlet property="messageField" destination="DcE-8I-pH3" id="G2X-oh-frG"/>
                 <outlet property="outputField" destination="EyU-Iq-sai" id="9O7-AA-RS0"/>
-                <outlet property="streamingButton" destination="5WD-rH-7rC" id="QqN-UI-iAY"/>
                 <outlet property="view" destination="EiT-Mj-1SZ" id="HXg-ep-0aP"/>
             </connections>
         </viewController>

+ 79 - 0
Examples/Echo/Swift/Echo/EchoService.swift

@@ -62,6 +62,73 @@ public class EchoGetCall {
   }
 }
 
+public class EchoExpandCall {
+  var call : Call
+
+  init(_ call: Call) {
+    self.call = call
+  }
+
+  func perform(request: Echo_EchoRequest,
+               callback:@escaping (CallResult, Echo_EchoResponse?) -> Void)
+    -> Void {
+      let requestMessageData = try! request.serializeProtobuf()
+      let requestMetadata = Metadata()
+      try! call.startServerStreaming(message: requestMessageData,
+                                     metadata: requestMetadata)
+      {(callResult) in
+        //print("Client received status \(callResult.statusCode): \(callResult.statusMessage!)")
+      }
+  }
+
+  func receiveMessage(callback:@escaping (Echo_EchoResponse?) throws -> Void) throws {
+    try call.receiveMessage() {(data) in
+      if let data = data {
+        if let responseMessage = try? Echo_EchoResponse(protobuf:data) {
+          try callback(responseMessage)
+        } else {
+          try callback(nil)
+        }
+      } else {
+        try callback(nil)
+      }
+    }
+  }
+
+}
+
+public class EchoCollectCall {
+  var call : Call
+
+  init(_ call: Call) {
+    self.call = call
+  }
+
+  func start(metadata:Metadata) throws {
+    try self.call.start(metadata: metadata)
+  }
+
+  func receiveMessage(callback:@escaping (Echo_EchoResponse?) throws -> Void) throws {
+    try call.receiveMessage() {(data) in
+      guard
+        let responseMessage = try? Echo_EchoResponse(protobuf:data)
+        else {
+          return
+      }
+      try callback(responseMessage)
+    }
+  }
+
+  func sendMessage(message: Echo_EchoRequest) {
+    let messageData = try! message.serializeProtobuf()
+    _ = call.sendMessage(data:messageData)
+  }
+
+  func close(completion:@escaping (() -> Void)) throws {
+    try call.close(completion:completion)
+  }
+}
+
 public class EchoUpdateCall {
   var call : Call
 
@@ -75,6 +142,10 @@ public class EchoUpdateCall {
 
   func receiveMessage(callback:@escaping (Echo_EchoResponse?) throws -> Void) throws {
     try call.receiveMessage() {(data) in
+      guard let data = data
+        else {
+          return
+      }
       guard
         let responseMessage = try? Echo_EchoResponse(protobuf:data)
         else {
@@ -109,6 +180,14 @@ public class EchoService {
     return EchoGetCall(channel.makeCall("/echo.Echo/Get"))
   }
 
+  func expand() -> EchoExpandCall {
+    return EchoExpandCall(channel.makeCall("/echo.Echo/Expand"))
+  }
+
+  func collect() -> EchoCollectCall {
+    return EchoCollectCall(channel.makeCall("/echo.Echo/Collect"))
+  }
+
   func update() -> EchoUpdateCall {
     return EchoUpdateCall(channel.makeCall("/echo.Echo/Update"))
   }

+ 119 - 17
Examples/Echo/Swift/Echo/EchoViewController.swift

@@ -37,10 +37,14 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
   @IBOutlet weak var messageField: NSTextField!
   @IBOutlet weak var outputField: NSTextField!
   @IBOutlet weak var addressField: NSTextField!
-  @IBOutlet weak var streamingButton: NSButton!
   @IBOutlet weak var TLSButton: NSButton!
+  @IBOutlet weak var callSelectButton: NSSegmentedControl!
+  @IBOutlet weak var closeButton: NSButton!
 
   private var service : EchoService?
+
+  private var expandCall: EchoExpandCall?
+  private var collectCall: EchoCollectCall?
   private var updateCall: EchoUpdateCall?
 
   private var nowStreaming = false
@@ -69,7 +73,7 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
     }
   }
 
-  @IBAction func buttonValueChanged(sender: NSButton) {
+  @IBAction func buttonValueChanged(sender: NSSegmentedControl) {
     if (nowStreaming && (sender.intValue == 0)) {
       if let error = try? self.sendClose() {
         print(error)
@@ -77,8 +81,17 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
     }
   }
 
+  @IBAction func closeButtonPressed(sender: NSButton) {
+    if (nowStreaming) {
+      if let error = try? self.sendClose() {
+        print(error)
+      }
+    }
+  }
+
   override func viewDidLoad() {
     gRPC.initialize()
+    closeButton.isEnabled = false
   }
 
   override func viewDidAppear() {
@@ -89,6 +102,9 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
   }
 
   func prepareService(address: String, host: String) {
+    if (service != nil) {
+      return
+    }
     if (TLSButton.intValue == 0) {
       service = EchoService(address:address)
     } else {
@@ -102,12 +118,14 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
   }
 
   func callServer(address:String) throws -> Void {
+    let host = "example.com"
+    prepareService(address:address, host:host)
+
     let requestMetadata = Metadata(["x-goog-api-key":"YOUR_API_KEY",
                                     "x-ios-bundle-identifier":Bundle.main.bundleIdentifier!])
-    let host = "example.com"
-    if (self.streamingButton.intValue == 0) {
+
+    if (self.callSelectButton.selectedSegment == 0) {
       // NONSTREAMING
-      prepareService(address:address, host:host)
       if let service = service {
         let call = service.get()
         var requestMessage = Echo_EchoRequest()
@@ -125,23 +143,95 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
         }
       }
     }
-    else {
-      // STREAMING
+    else if (self.callSelectButton.selectedSegment == 1) {
+      // STREAMING EXPAND
+      if (!nowStreaming) {
+        guard let service = service else {
+          return
+        }
+        expandCall = service.expand()
+        var requestMessage = Echo_EchoRequest()
+        requestMessage.text = self.messageField.stringValue
+        try expandCall!.perform(request:requestMessage) {(callResult, response) in
+        }
+        try! self.receiveExpandMessage()
+      }
+    }
+    else if (self.callSelectButton.selectedSegment == 2) {
+      // STREAMING COLLECT
+      if (!nowStreaming) {
+        guard let service = service else {
+          return
+        }
+        collectCall = service.collect()
+        try collectCall!.start(metadata:requestMetadata)
+        try self.receiveCollectMessage()
+        nowStreaming = true
+        closeButton.isEnabled = true
+      }
+      self.sendCollectMessage()
+
+    }
+    else if (self.callSelectButton.selectedSegment == 3) {
+      // STREAMING UPDATE
       if (!nowStreaming) {
-        prepareService(address:address, host:host)
         guard let service = service else {
           return
         }
         updateCall = service.update()
         try updateCall!.start(metadata:requestMetadata)
-        try self.receiveMessage()
+        try self.receiveUpdateMessage()
         nowStreaming = true
+        closeButton.isEnabled = true
       }
-      self.sendMessage()
+      self.sendUpdateMessage()
     }
   }
 
-  func sendMessage() {
+  func receiveExpandMessage() throws -> Void {
+    guard let expandCall = expandCall else {
+      return
+    }
+    try expandCall.receiveMessage() {(responseMessage) in
+      if let responseMessage = responseMessage {
+        try self.receiveExpandMessage() // prepare to receive the next message
+        DispatchQueue.main.async {
+          self.outputField.stringValue = responseMessage.text
+        }
+      } else {
+        print("expand closed")
+      }
+    }
+  }
+
+  func sendCollectMessage() {
+    if let collectCall = collectCall {
+      var requestMessage = Echo_EchoRequest()
+      requestMessage.text = self.messageField.stringValue
+      _ = collectCall.sendMessage(message:requestMessage)
+    }
+  }
+
+  func receiveCollectMessage() throws -> Void {
+    guard let collectCall = collectCall else {
+      return
+    }
+    try collectCall.receiveMessage() {(responseMessage) in
+      if let responseMessage = responseMessage {
+        DispatchQueue.main.async {
+          self.outputField.stringValue = responseMessage.text
+        }
+      } else {
+        print("collect closed")
+        self.nowStreaming = false
+        self.closeButton.isEnabled = false
+      }
+    }
+  }
+
+
+
+  func sendUpdateMessage() {
     if let updateCall = updateCall {
       var requestMessage = Echo_EchoRequest()
       requestMessage.text = self.messageField.stringValue
@@ -149,26 +239,38 @@ class EchoViewController : NSViewController, NSTextFieldDelegate {
     }
   }
 
-  func receiveMessage() throws -> Void {
+  func receiveUpdateMessage() throws -> Void {
     guard let updateCall = updateCall else {
       return
     }
     try updateCall.receiveMessage() {(responseMessage) in
-      try self.receiveMessage() // prepare to receive the next message
+      try self.receiveUpdateMessage() // prepare to receive the next message
       if let responseMessage = responseMessage {
         DispatchQueue.main.async {
           self.outputField.stringValue = responseMessage.text
         }
+      } else {
+        print("update closed")
+        self.nowStreaming = false
+        self.closeButton.isEnabled = false
       }
     }
   }
 
   func sendClose() throws {
-    guard let updateCall = updateCall else {
-      return
+    if let updateCall = updateCall {
+      try updateCall.close() {
+        self.updateCall = nil
+        self.nowStreaming = false
+        self.closeButton.isEnabled = false
+      }
     }
-    try updateCall.close() {
-      self.nowStreaming = false
+    if let collectCall = collectCall {
+      try collectCall.close() {
+        self.collectCall = nil
+        self.nowStreaming = false
+        self.closeButton.isEnabled = false
+      }
     }
   }
 }

+ 34 - 1
Packages/gRPC/Sources/Call.swift

@@ -165,7 +165,7 @@ public class Call {
   ///
   /// - Parameter message: data containing the message to send
   /// - Parameter metadata: metadata to send with the call
-  /// - Parameter callback: a blocko to call with a CallResponse object containing call results
+  /// - Parameter callback: a block to call with a CallResponse object containing call results
   public func perform(message: Data,
                       metadata: Metadata,
                       completion: @escaping (CallResult) throws -> Void)
@@ -198,6 +198,37 @@ public class Call {
       try self.perform(operations)
   }
 
+  public func startServerStreaming(message: Data,
+                                   metadata: Metadata,
+                                   completion: @escaping (CallResult) throws -> Void)
+    throws -> Void {
+      let messageBuffer = ByteBuffer(data:message)
+      let operations = OperationGroup(call:self,
+                                      operations:[.sendInitialMetadata(metadata),
+                                                  .sendMessage(messageBuffer),
+                                                  .sendCloseFromClient,
+                                                  .receiveInitialMetadata
+                                                  ],
+                                      completion:
+        {(operationGroup) in
+          if operationGroup.success {
+            try completion(CallResult(statusCode:0,
+                                      statusMessage:nil,
+                                      resultData:nil,
+                                      initialMetadata:operationGroup.receivedInitialMetadata(),
+                                      trailingMetadata:nil))
+          } else {
+            try completion(CallResult(statusCode:0,
+                                      statusMessage:nil,
+                                      resultData:nil,
+                                      initialMetadata:nil,
+                                      trailingMetadata:nil))
+          }
+      })
+
+      try self.perform(operations)
+  }
+
   /// start a streaming connection
   ///
   /// Parameter metadata: initial metadata to send
@@ -266,6 +297,8 @@ public class Call {
       if operationGroup.success {
         if let messageBuffer = operationGroup.receivedMessage() {
           try callback(messageBuffer.data())
+        } else {
+          try callback(nil) // an empty response signals the end of a connection
         }
       }
     }

BIN
gRPC.xcodeproj/project.xcworkspace/xcuserdata/timburks.xcuserdatad/UserInterfaceState.xcuserstate