Skip to content

Commit

Permalink
De-specialize unmarshalling requests of streamed replies methods
Browse files Browse the repository at this point in the history
The unmarshalling is now done the same way it is done for other methods.
  • Loading branch information
cdevienne committed Sep 18, 2018
1 parent 32d288d commit c59a3ad
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 85 deletions.
129 changes: 73 additions & 56 deletions examples/alloptions/alloptions.nrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,7 @@ func (h *SvcCustomSubjectHandler) MtNoRequestPublish(pkginstance string, msg Sim
}

func (h *SvcCustomSubjectHandler) MtStreamedReplyHandler(
ctx context.Context, request *nrpc.Request, data []byte,
) {
_, encoding, err := nrpc.ParseSubjectTail(0, request.SubjectTail)
if err != nil {
log.Printf("SvcCustomSubject: MtStreamedReply subject parsing failed:")
}
request.Encoding = encoding
var req StringArg
if err := nrpc.Unmarshal(encoding, data, &req); err != nil {
// Handle error
return
}

ctx context.Context, request *nrpc.Request, req StringArg) {
ctx, cancel := context.WithCancel(ctx)

keepStreamAlive := nrpc.NewKeepStreamAlive(
Expand All @@ -75,13 +63,13 @@ func (h *SvcCustomSubjectHandler) MtStreamedReplyHandler(

_, nrpcErr := nrpc.CaptureErrors(func() (proto.Message, error) {
err := h.server.MtStreamedReply(ctx, req, func(rep SimpleStringReply){
if err = request.SendReply(&rep, nil); err != nil {
log.Printf("nrpc: error publishing response")
cancel()
return
}
msgCount++
})
if err := request.SendReply(&rep, nil); err != nil {
log.Printf("nrpc: error publishing response")
cancel()
return
}
msgCount++
})
return nil, err
})
keepStreamAlive.Stop()
Expand All @@ -96,14 +84,7 @@ func (h *SvcCustomSubjectHandler) MtStreamedReplyHandler(
}

func (h *SvcCustomSubjectHandler) MtVoidReqStreamedReplyHandler(
ctx context.Context, request *nrpc.Request, data []byte,
) {
_, encoding, err := nrpc.ParseSubjectTail(0, request.SubjectTail)
if err != nil {
log.Printf("SvcCustomSubject: MtVoidReqStreamedReply subject parsing failed:")
}
request.Encoding = encoding

ctx context.Context, request *nrpc.Request) {
ctx, cancel := context.WithCancel(ctx)

keepStreamAlive := nrpc.NewKeepStreamAlive(
Expand All @@ -114,13 +95,13 @@ func (h *SvcCustomSubjectHandler) MtVoidReqStreamedReplyHandler(

_, nrpcErr := nrpc.CaptureErrors(func() (proto.Message, error) {
err := h.server.MtVoidReqStreamedReply(ctx, func(rep SimpleStringReply){
if err = request.SendReply(&rep, nil); err != nil {
log.Printf("nrpc: error publishing response")
cancel()
return
}
msgCount++
})
if err := request.SendReply(&rep, nil); err != nil {
log.Printf("nrpc: error publishing response")
cancel()
return
}
msgCount++
})
return nil, err
})
keepStreamAlive.Stop()
Expand Down Expand Up @@ -200,11 +181,39 @@ func (h *SvcCustomSubjectHandler) Handler(msg *nats.Msg) {
// MtNoRequest is a no-request method. Ignore it.
return
case "mtstreamedreply":
h.MtStreamedReplyHandler(h.ctx, request, msg.Data)
return
_, request.Encoding, err = nrpc.ParseSubjectTail(0, request.SubjectTail)
if err != nil {
log.Printf("MtStreamedReplyHanlder: MtStreamedReply subject parsing failed: %v", err)
break
}
var req StringArg
if err := nrpc.Unmarshal(request.Encoding, msg.Data, &req); err != nil {
log.Printf("MtStreamedReplyHandler: MtStreamedReply request unmarshal failed: %v", err)
immediateError = &nrpc.Error{
Type: nrpc.Error_CLIENT,
Message: "bad request received: " + err.Error(),
}
} else {
h.MtStreamedReplyHandler(h.ctx, request, req)
return
}
case "mtvoidreqstreamedreply":
h.MtVoidReqStreamedReplyHandler(h.ctx, request, msg.Data)
return
_, request.Encoding, err = nrpc.ParseSubjectTail(0, request.SubjectTail)
if err != nil {
log.Printf("MtVoidReqStreamedReplyHanlder: MtVoidReqStreamedReply subject parsing failed: %v", err)
break
}
var req github_com_nats_rpc_nrpc.Void
if err := nrpc.Unmarshal(request.Encoding, msg.Data, &req); err != nil {
log.Printf("MtVoidReqStreamedReplyHandler: MtVoidReqStreamedReply request unmarshal failed: %v", err)
immediateError = &nrpc.Error{
Type: nrpc.Error_CLIENT,
Message: "bad request received: " + err.Error(),
}
} else {
h.MtVoidReqStreamedReplyHandler(h.ctx, request)
return
}
default:
log.Printf("SvcCustomSubjectHandler: unknown name %q", name)
immediateError = &nrpc.Error{
Expand Down Expand Up @@ -420,14 +429,7 @@ func (h *SvcSubjectParamsHandler) Subject() string {
}

func (h *SvcSubjectParamsHandler) MtStreamedReplyWithSubjectParamsHandler(
ctx context.Context, request *nrpc.Request, data []byte,
) {
mtParams, encoding, err := nrpc.ParseSubjectTail(2, request.SubjectTail)
if err != nil {
log.Printf("SvcSubjectParams: MtStreamedReplyWithSubjectParams subject parsing failed:")
}
request.Encoding = encoding

ctx context.Context, request *nrpc.Request, mtParams []string) {
ctx, cancel := context.WithCancel(ctx)

keepStreamAlive := nrpc.NewKeepStreamAlive(
Expand All @@ -438,13 +440,13 @@ func (h *SvcSubjectParamsHandler) MtStreamedReplyWithSubjectParamsHandler(

_, nrpcErr := nrpc.CaptureErrors(func() (proto.Message, error) {
err := h.server.MtStreamedReplyWithSubjectParams(ctx, mtParams[0], mtParams[1], func(rep SimpleStringReply){
if err = request.SendReply(&rep, nil); err != nil {
log.Printf("nrpc: error publishing response")
cancel()
return
}
msgCount++
})
if err := request.SendReply(&rep, nil); err != nil {
log.Printf("nrpc: error publishing response")
cancel()
return
}
msgCount++
})
return nil, err
})
keepStreamAlive.Stop()
Expand Down Expand Up @@ -510,8 +512,23 @@ func (h *SvcSubjectParamsHandler) Handler(msg *nats.Msg) {
}
}
case "mtstreamedreplywithsubjectparams":
h.MtStreamedReplyWithSubjectParamsHandler(h.ctx, request, msg.Data)
return
var mtParams []string
mtParams, request.Encoding, err = nrpc.ParseSubjectTail(2, request.SubjectTail)
if err != nil {
log.Printf("MtStreamedReplyWithSubjectParamsHanlder: MtStreamedReplyWithSubjectParams subject parsing failed: %v", err)
break
}
var req github_com_nats_rpc_nrpc.Void
if err := nrpc.Unmarshal(request.Encoding, msg.Data, &req); err != nil {
log.Printf("MtStreamedReplyWithSubjectParamsHandler: MtStreamedReplyWithSubjectParams request unmarshal failed: %v", err)
immediateError = &nrpc.Error{
Type: nrpc.Error_CLIENT,
Message: "bad request received: " + err.Error(),
}
} else {
h.MtStreamedReplyWithSubjectParamsHandler(h.ctx, request, mtParams)
return
}
case "mtnoreply":
request.NoReply = true
_, request.Encoding, err = nrpc.ParseSubjectTail(0, request.SubjectTail)
Expand Down
56 changes: 27 additions & 29 deletions protoc-gen-nrpc/tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,25 +158,14 @@ func (h *{{$serviceName}}Handler) {{.GetName}}Publish(
{{- if HasStreamedReply .}}
func (h *{{$serviceName}}Handler) {{.GetName}}Handler(
ctx context.Context, request *nrpc.Request, data []byte,
ctx context.Context, request *nrpc.Request
{{- if GetMethodSubjectParams . -}}
, mtParams []string
{{- end -}}
{{- if ne .GetInputType ".nrpc.Void" -}}
, req {{GoType .GetInputType}}
{{- end -}}
) {
{{if ne 0 (len (GetMethodSubjectParams .)) -}}
mtParams
{{- else -}}_{{- end -}}
, encoding, err := nrpc.ParseSubjectTail({{len (GetMethodSubjectParams .)}}, request.SubjectTail)
if err != nil {
log.Printf("{{$serviceName}}: {{.GetName}} subject parsing failed:")
}
request.Encoding = encoding
{{- if ne .GetInputType ".nrpc.Void"}}
var req {{GoType .GetInputType}}
if err := nrpc.Unmarshal(encoding, data, &req); err != nil {
// Handle error
return
}
{{- end}}
ctx, cancel := context.WithCancel(ctx)
keepStreamAlive := nrpc.NewKeepStreamAlive(
Expand All @@ -194,13 +183,13 @@ func (h *{{$serviceName}}Handler) {{.GetName}}Handler(
, req
{{- end -}}
, func(rep {{GoType .GetOutputType}}){
if err = request.SendReply(&rep, nil); err != nil {
log.Printf("nrpc: error publishing response")
cancel()
return
}
msgCount++
})
if err := request.SendReply(&rep, nil); err != nil {
log.Printf("nrpc: error publishing response")
cancel()
return
}
msgCount++
})
return nil, err
})
keepStreamAlive.Stop()
Expand Down Expand Up @@ -248,10 +237,7 @@ func (h *{{.GetName}}Handler) Handler(msg *nats.Msg) {
{{- if eq .GetInputType ".nrpc.NoRequest"}}
// {{.GetName}} is a no-request method. Ignore it.
return
{{- else if HasStreamedReply .}}
h.{{.GetName}}Handler(h.ctx, request, msg.Data)
return
{{- else}}{{/* HasStreamedReply */}}
{{- else}}{{/* !NoRequest */}}
{{- if ne 0 (len (GetMethodSubjectParams .))}}
var mtParams []string
{{- end}}
Expand All @@ -275,6 +261,17 @@ func (h *{{.GetName}}Handler) Handler(msg *nats.Msg) {
"{{.GetName}}", request.Encoding, "unmarshal_fail").Inc()
{{- end}}
} else {
{{- if HasStreamedReply .}}
h.{{.GetName}}Handler(h.ctx, request
{{- if GetMethodSubjectParams . -}}
, mtParams
{{- end -}}
{{- if ne .GetInputType ".nrpc.Void" -}}
, req
{{- end -}}
)
return
{{- else }}
request.Handler = func(ctx context.Context)(proto.Message, error){
{{- if eq .GetOutputType ".nrpc.NoReply" -}}
var innerResp nrpc.NoReply
Expand All @@ -296,6 +293,7 @@ func (h *{{.GetName}}Handler) Handler(msg *nats.Msg) {
}
return &innerResp, err
}
{{- end }}
}
{{- end}}{{/* not HasStreamedReply */}}
{{- end}}{{/* range .Method */}}
Expand Down

0 comments on commit c59a3ad

Please sign in to comment.