diff --git a/src/Kafka/Client.hs b/src/Kafka/Client.hs index 813c500..d59c4ee 100644 --- a/src/Kafka/Client.hs +++ b/src/Kafka/Client.hs @@ -50,18 +50,18 @@ import Kafka.Protocol import Network.Socket import qualified Network.Socket.ByteString.Lazy as SBL -------------------- ---Send Functions ------------------- +---------------- +-- Types +---------------- + +-- | Pack and encode given Request of Type Req and send via given socket sendRequest :: Socket -> Req -> IO () sendRequest socket req = do print $ show $ BL.length msg SBL.sendAll socket msg where msg = runPut $ buildRqMessage $ pack req --------------------- ---Types -------------------- +-- | Request Type data Req = Produce Head [ToTopic] | Fetch Head [FromTopic] | Metadata Head [OfTopic] class Packable a where @@ -72,42 +72,55 @@ instance Packable Req where pack (Fetch head ts) = packFtRqMessage head ts pack (Metadata head ts) = packMdRqMessage head ts +-- | Header information each request includes data Head = Head { apiV :: Int , corr :: Int , client :: BS.ByteString } +-- | Topic for Produce Request data ToTopic = ToTopic BS.ByteString [ToPart] +-- | Partition for Produce Request data ToPart = ToPart Int [Data] type Data = BS.ByteString +-- | Topic for Fetch Request data FromTopic = FromTopic BS.ByteString [FromPart] +-- | Partition for Fetch Request data FromPart = FromPart { partId :: Int , offset :: Int } +-- | Topic for Metadata Request data OfTopic = OfTopic BS.ByteString ---------------- -- Converting API ---------------- + +-- | Convert string to topicsName (ByteString) stringToTopic :: String -> BS.ByteString stringToTopic s = BC.pack s +-- | Convert Data.Text to topicsName (ByteString) textToTopic :: T.Text -> BS.ByteString textToTopic t = encodeUtf8 t +-- | Convert string to clientId (ByteString) stringToClientId :: String -> BS.ByteString stringToClientId s = BC.pack s +-- | Convert Data.Text to clientId (ByteString) textToClientId :: T.Text -> BS.ByteString textToClientId t = encodeUtf8 t +-- | Convert string to message data (ByteString) stringToData :: String -> BS.ByteString stringToData s = BC.pack s +-- | Convert Data.Text to message data (ByteString) textToData :: T.Text -> BS.ByteString textToData t = encodeUtf8 t @@ -126,6 +139,7 @@ arrayLength xs = fromIntegral $ length xs -------------------- --Pack Functions -------------------- + -- | Pack a protocol conform RequestMessage for Produce API packPrRqMessage :: Head -> [ToTopic] -> RequestMessage packPrRqMessage head ts = RequestMessage diff --git a/src/Kafka/Protocol/Decode.hs b/src/Kafka/Protocol/Decode.hs index c97866d..ba0a9af 100644 --- a/src/Kafka/Protocol/Decode.hs +++ b/src/Kafka/Protocol/Decode.hs @@ -6,6 +6,10 @@ License : Maintainer : mail@marcjuch.li, lorenz.wolf@bluewin.ch Stability : experimental Portability : portable + +This module exposes functions for decoding requests and responses of kafka +prototol implemention. + -} module Kafka.Protocol.Decode ( messageSetParser diff --git a/src/Kafka/Protocol/Encode.hs b/src/Kafka/Protocol/Encode.hs index c0bc3a5..1b4c838 100644 --- a/src/Kafka/Protocol/Encode.hs +++ b/src/Kafka/Protocol/Encode.hs @@ -6,6 +6,8 @@ License : Maintainer : mail@marcjuch.li, lorenz.wolf@bluewin.ch Stability : experimental Portability : portable + +This module exposes Encode functionalities for kafka protocol implementation. -} module Kafka.Protocol.Encode ( buildMessageSet @@ -92,7 +94,7 @@ buildRqMessage e = do 0 -> buildProduceRequest $ rqRequest e 1 -> buildFetchRequest $ rqRequest e 3 -> buildMetadataRequest $ rqRequest e - -- further API Codes not implemented yet + -- TODO: further API Codes buildTopic :: (Partition -> Put) -> RqTopic -> Put buildTopic pb t = do @@ -170,7 +172,7 @@ buildRsTopic b t = do putWord16be $ rsTopicNameLen t putByteString $ rsTopicName t putWord32be $ rsNumPayloads t - buildList buildRsPrPayload $ rsPayloads t + buildList b $ rsPayloads t -- | Produce Response (Pr) buildRsPrPayload :: RsPayload -> Put diff --git a/src/Kafka/Protocol/Types.hs b/src/Kafka/Protocol/Types.hs index ba8b803..8854a84 100644 --- a/src/Kafka/Protocol/Types.hs +++ b/src/Kafka/Protocol/Types.hs @@ -79,10 +79,11 @@ type Attributes = Word8 data Payload = Payload { plMagic :: !Magic , plAttr :: !Attributes - --, plKeylen :: !BytesLength wolflo: According to Kafka documentation - --the key is type bytes, therefore it should have a len field before. In - --actual produced message from Kafka, there is no field len for key, key is - --fix 32 Bit + --, plKeylen :: !BytesLength + -- TODO: According to Kafka documentation + -- the key is type bytes, therefore it should have a len field before. In + -- actual produced message from Kafka, there is no field len for key. To + -- provide compatibility with tested version keylen is commented out. , plKey :: !MessageKey , plValueLen :: !BytesLength , plValue :: !MessageValue @@ -136,10 +137,6 @@ data RequestMessage = RequestMessage , rqRequest :: Request } deriving (Show, Eq) --- NOTE (meiersi): the field accessors generated from your declaration are all --- partial functions. You can avoid that by introducing proper records for all --- the constructors and the create 'Request' such that it just tags these --- individual types. data Request = ProduceRequest { rqPrRequiredAcks :: !RequiredAcks , rqPrTimeout :: !Timeout