Skip to content

Commit

Permalink
RPC container content fix (#232)
Browse files Browse the repository at this point in the history
* Fix RPC edge case

* Format code

(cherry picked from commit 5de1fad)
  • Loading branch information
Corey-Zumar authored and dcrankshaw committed Jun 27, 2017
1 parent a3ee768 commit d70348a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
7 changes: 4 additions & 3 deletions containers/python/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def run(self):

t4 = datetime.now()

response.send(socket)
response.send(socket, self.event_history)

print("recv: %f us, parse: %f us, handle: %f us" %
((t2 - t1).microseconds, (t3 - t2).microseconds,
Expand All @@ -313,7 +313,7 @@ def run(self):
else:
feedback_request = FeedbackRequest(msg_id_bytes, [])
response = self.handle_feedback_request(received_msg)
response.send(socket)
response.send(socket, self.event_history)
print("recv: %f us" % ((t2 - t1).microseconds))

sys.stdout.flush()
Expand Down Expand Up @@ -394,13 +394,14 @@ def add_output(self, output):
self.string_content_end_position + output_len] = output
self.string_content_end_position += output_len

def send(self, socket):
def send(self, socket, event_history):
socket.send("", flags=zmq.SNDMORE)
socket.send(
struct.pack("<I", MESSAGE_TYPE_CONTAINER_CONTENT),
flags=zmq.SNDMORE)
socket.send(self.msg_id, flags=zmq.SNDMORE)
socket.send(self.output_buffer[0:self.string_content_end_position])
event_history.insert(EVENT_HISTORY_SENT_CONTAINER_CONTENT)

def expand_buffer_if_necessary(self, size):
if len(self.output_buffer) < size:
Expand Down
14 changes: 7 additions & 7 deletions src/libclipper/src/rpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,13 @@ void RPCService::receive_message(
zmq_connection_id += 1;
}
} break;
case MessageType::ContainerContent:
case MessageType::ContainerContent: {
// This message is a response to a container query
message_t msg_id;
message_t msg_content;
socket.recv(&msg_id, 0);
socket.recv(&msg_content, 0);
if (!new_connection) {
// This message is a response to a container query
message_t msg_id;
message_t msg_content;
socket.recv(&msg_id, 0);
socket.recv(&msg_content, 0);
int id = static_cast<int *>(msg_id.data())[0];
vector<uint8_t> content(
(uint8_t *)msg_content.data(),
Expand All @@ -303,7 +303,7 @@ void RPCService::receive_message(

response_queue_->push(response);
}
break;
} break;
case MessageType::Heartbeat:
send_heartbeat_response(socket, connection_id, new_connection);
break;
Expand Down

0 comments on commit d70348a

Please sign in to comment.