Skip to content

Commit

Permalink
clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
LoW0lf committed Jun 12, 2015
1 parent f80822d commit 8f3431a
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 16 deletions.
26 changes: 20 additions & 6 deletions src/Kafka/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,18 @@ import Kafka.Protocol
import Network.Socket
import qualified Network.Socket.ByteString.Lazy as SBL

-------------------
--Send Functions
------------------
----------------
-- Types
----------------

-- | Pack and encode given Request of Type Req and send via given socket
sendRequest :: Socket -> Req -> IO ()
sendRequest socket req = do
print $ show $ BL.length msg
SBL.sendAll socket msg
where msg = runPut $ buildRqMessage $ pack req

--------------------
--Types
-------------------
-- | Request Type
data Req = Produce Head [ToTopic] | Fetch Head [FromTopic] | Metadata Head [OfTopic]

class Packable a where
Expand All @@ -72,42 +72,55 @@ instance Packable Req where
pack (Fetch head ts) = packFtRqMessage head ts
pack (Metadata head ts) = packMdRqMessage head ts

-- | Header information each request includes
data Head = Head
{ apiV :: Int
, corr :: Int
, client :: BS.ByteString
}

-- | Topic for Produce Request
data ToTopic = ToTopic BS.ByteString [ToPart]
-- | Partition for Produce Request
data ToPart = ToPart Int [Data]
type Data = BS.ByteString

-- | Topic for Fetch Request
data FromTopic = FromTopic BS.ByteString [FromPart]
-- | Partition for Fetch Request
data FromPart = FromPart
{ partId :: Int
, offset :: Int
}

-- | Topic for Metadata Request
data OfTopic = OfTopic BS.ByteString

----------------
-- Converting API
----------------

-- | Convert string to topicsName (ByteString)
stringToTopic :: String -> BS.ByteString
stringToTopic s = BC.pack s

-- | Convert Data.Text to topicsName (ByteString)
textToTopic :: T.Text -> BS.ByteString
textToTopic t = encodeUtf8 t

-- | Convert string to clientId (ByteString)
stringToClientId :: String -> BS.ByteString
stringToClientId s = BC.pack s

-- | Convert Data.Text to clientId (ByteString)
textToClientId :: T.Text -> BS.ByteString
textToClientId t = encodeUtf8 t

-- | Convert string to message data (ByteString)
stringToData :: String -> BS.ByteString
stringToData s = BC.pack s

-- | Convert Data.Text to message data (ByteString)
textToData :: T.Text -> BS.ByteString
textToData t = encodeUtf8 t

Expand All @@ -126,6 +139,7 @@ arrayLength xs = fromIntegral $ length xs
--------------------
--Pack Functions
--------------------

-- | Pack a protocol conform RequestMessage for Produce API
packPrRqMessage :: Head -> [ToTopic] -> RequestMessage
packPrRqMessage head ts = RequestMessage
Expand Down
4 changes: 4 additions & 0 deletions src/Kafka/Protocol/Decode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ License :
Maintainer : mail@marcjuch.li, lorenz.wolf@bluewin.ch
Stability : experimental
Portability : portable
This module exposes functions for decoding requests and responses of kafka
prototol implemention.
-}
module Kafka.Protocol.Decode
( messageSetParser
Expand Down
6 changes: 4 additions & 2 deletions src/Kafka/Protocol/Encode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ License :
Maintainer : mail@marcjuch.li, lorenz.wolf@bluewin.ch
Stability : experimental
Portability : portable
This module exposes Encode functionalities for kafka protocol implementation.
-}
module Kafka.Protocol.Encode
( buildMessageSet
Expand Down Expand Up @@ -92,7 +94,7 @@ buildRqMessage e = do
0 -> buildProduceRequest $ rqRequest e
1 -> buildFetchRequest $ rqRequest e
3 -> buildMetadataRequest $ rqRequest e
-- further API Codes not implemented yet
-- TODO: further API Codes

buildTopic :: (Partition -> Put) -> RqTopic -> Put
buildTopic pb t = do
Expand Down Expand Up @@ -170,7 +172,7 @@ buildRsTopic b t = do
putWord16be $ rsTopicNameLen t
putByteString $ rsTopicName t
putWord32be $ rsNumPayloads t
buildList buildRsPrPayload $ rsPayloads t
buildList b $ rsPayloads t

-- | Produce Response (Pr)
buildRsPrPayload :: RsPayload -> Put
Expand Down
13 changes: 5 additions & 8 deletions src/Kafka/Protocol/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ type Attributes = Word8
data Payload = Payload
{ plMagic :: !Magic
, plAttr :: !Attributes
--, plKeylen :: !BytesLength wolflo: According to Kafka documentation
--the key is type bytes, therefore it should have a len field before. In
--actual produced message from Kafka, there is no field len for key, key is
--fix 32 Bit
--, plKeylen :: !BytesLength
-- TODO: According to Kafka documentation
-- the key is type bytes, therefore it should have a len field before. In
-- actual produced message from Kafka, there is no field len for key. To
-- provide compatibility with tested version keylen is commented out.
, plKey :: !MessageKey
, plValueLen :: !BytesLength
, plValue :: !MessageValue
Expand Down Expand Up @@ -136,10 +137,6 @@ data RequestMessage = RequestMessage
, rqRequest :: Request
} deriving (Show, Eq)

-- NOTE (meiersi): the field accessors generated from your declaration are all
-- partial functions. You can avoid that by introducing proper records for all
-- the constructors and the create 'Request' such that it just tags these
-- individual types.
data Request = ProduceRequest
{ rqPrRequiredAcks :: !RequiredAcks
, rqPrTimeout :: !Timeout
Expand Down

0 comments on commit 8f3431a

Please sign in to comment.