-
Notifications
You must be signed in to change notification settings - Fork 66
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
http (internal): Split Sync/Acync client code files (#3210)
- Loading branch information
Showing
3 changed files
with
291 additions
and
251 deletions.
There are no files selected for viewing
144 changes: 144 additions & 0 deletions
144
airframe-http/src/main/scala/wvlet/airframe/http/client/AsyncClient.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package wvlet.airframe.http.client | ||
|
||
import wvlet.airframe.control.CircuitBreaker | ||
import wvlet.airframe.http.HttpMessage.{Request, Response} | ||
import wvlet.airframe.http.{HttpClientException, HttpLogger, RPCException, RPCMethod} | ||
import wvlet.airframe.rx.Rx | ||
import wvlet.airframe.surface.Surface | ||
|
||
/** | ||
* A standard async http client interface for Rx[_] | ||
*/ | ||
trait AsyncClient extends AsyncClientCompat with HttpClientFactory[AsyncClient] with AutoCloseable { | ||
protected def channel: HttpChannel | ||
def config: HttpClientConfig | ||
|
||
private val httpLogger: HttpLogger = config.newHttpLogger | ||
private val loggingFilter: HttpClientFilter = config.newLoggingFilter(httpLogger) | ||
private val circuitBreaker: CircuitBreaker = config.circuitBreaker | ||
|
||
override def close(): Unit = { | ||
httpLogger.close() | ||
} | ||
|
||
/** | ||
* Send an HTTP request and get the response in Rx[Response] type. | ||
* | ||
* It will return `Rx[HttpClientException]` for non-successful responses. For example, when receiving non-retryable | ||
* status code (e.g., 4xx), it will return Rx[HttpClientException]. For server side failures (5xx responses), this | ||
* continues request retry until the max retry count. | ||
* | ||
* If it exceeds the number of max retry attempts, it will return Rx[HttpClientMaxRetryException]. | ||
*/ | ||
def send(req: Request, context: HttpClientContext = HttpClientContext.empty): Rx[Response] = { | ||
val request = config.requestFilter(req) | ||
var lastResponse: Option[Response] = None | ||
config.retryContext | ||
.runAsyncWithContext(request, circuitBreaker) { | ||
loggingFilter | ||
.andThen(config.clientFilter) | ||
.apply(context) | ||
.andThen(req => channel.sendAsync(req, config)) | ||
.apply(request) | ||
.map { resp => | ||
// Remember the last response for error reporting purpose | ||
lastResponse = Some(config.responseFilter(resp)) | ||
resp | ||
} | ||
} | ||
.recover { | ||
HttpClients.defaultHttpClientErrorHandler(lastResponse) | ||
} | ||
} | ||
|
||
/** | ||
* Send an HTTP request and returns a response (or the last response if the request is retried) | ||
* | ||
* @param req | ||
* @return | ||
*/ | ||
def sendSafe(req: Request, context: HttpClientContext = HttpClientContext.empty): Rx[Response] = { | ||
send(req, context).toRx.recover { case e: HttpClientException => | ||
e.response.toHttpResponse | ||
} | ||
} | ||
|
||
def readAsInternal[Resp]( | ||
req: Request, | ||
responseSurface: Surface | ||
): Rx[Resp] = { | ||
send(req).toRx.map { resp => | ||
HttpClients.parseResponse[Resp](config, responseSurface, resp) | ||
} | ||
} | ||
|
||
def callInternal[Req, Resp]( | ||
req: Request, | ||
requestSurface: Surface, | ||
responseSurface: Surface, | ||
requestContent: Req | ||
): Rx[Resp] = { | ||
Rx | ||
.const(HttpClients.prepareRequest(config, req, requestSurface, requestContent)) | ||
.flatMap { (newRequest: Request) => | ||
send(newRequest, HttpClientContext(config.name)).toRx.map { resp => | ||
HttpClients.parseResponse[Resp](config, responseSurface, resp) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* @param method | ||
* @param requestContent | ||
* @tparam Req | ||
* @tparam Resp | ||
* @return | ||
* | ||
* @throws RPCException | ||
* when RPC request fails | ||
*/ | ||
def rpc[Req, Resp]( | ||
method: RPCMethod, | ||
requestContent: Req | ||
): Rx[Resp] = { | ||
Rx | ||
.const(HttpClients.prepareRPCRequest(config, method.path, method.requestSurface, requestContent)) | ||
.flatMap { (request: Request) => | ||
val context = HttpClientContext( | ||
clientName = config.name, | ||
rpcMethod = Some(method), | ||
rpcInput = Some(requestContent) | ||
) | ||
sendSafe(request, context).toRx | ||
.map { (response: Response) => | ||
if (response.status.isSuccessful) { | ||
val ret = HttpClients.parseRPCResponse(config, response, method.responseSurface) | ||
ret.asInstanceOf[Resp] | ||
} else { | ||
throw RPCException.fromResponse(response) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
class AsyncClientImpl(protected val channel: HttpChannel, val config: HttpClientConfig) extends AsyncClient { | ||
override protected def build(newConfig: HttpClientConfig): AsyncClient = new AsyncClientImpl(channel, newConfig) | ||
override def close(): Unit = { | ||
super.close() | ||
channel.close() | ||
} | ||
} |
Oops, something went wrong.