Browse Source

new project

herry 4 years ago
commit
745806e034
16 changed files with 1082 additions and 0 deletions
  1. 17 0
      .gitignore
  2. 22 0
      LICENSE.md
  3. 17 0
      Project.toml
  4. 107 0
      README.md
  5. 1 0
      REQUIRE
  6. 45 0
      appveyor.yml
  7. 83 0
      docker-compose.yml
  8. 14 0
      src/RDKafka.jl
  9. 117 0
      src/client.jl
  10. 90 0
      src/consumer.jl
  11. 5 0
      src/core.jl
  12. 43 0
      src/producer.jl
  13. 9 0
      src/utils.jl
  14. 264 0
      src/wrapper.jl
  15. 218 0
      src/wrapper_consts.jl
  16. 30 0
      test/runtests.jl

+ 17 - 0
.gitignore

@@ -0,0 +1,17 @@
+*.jl.cov
+*.jl.*.cov
+*.jl.mem
+
+# build
+*.so
+deps/usr
+build.log
+*.zip
+
+# editor files, history
+*~
+.juliahistory
+
+# intensionally invisible files
+_*
+Manifest.toml

+ 22 - 0
LICENSE.md

@@ -0,0 +1,22 @@
+The RDKafka.jl package is licensed under the MIT "Expat" License:
+
+> Copyright (c) 2018: Andrei Zhabinski.
+>
+> Permission is hereby granted, free of charge, to any person obtaining a copy
+> of this software and associated documentation files (the "Software"), to deal
+> in the Software without restriction, including without limitation the rights
+> to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+> copies of the Software, and to permit persons to whom the Software is
+> furnished to do so, subject to the following conditions:
+>
+> The above copyright notice and this permission notice shall be included in all
+> copies or substantial portions of the Software.
+>
+> THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+> IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+> FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+> AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+> LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+> OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+> SOFTWARE.
+>

+ 17 - 0
Project.toml

@@ -0,0 +1,17 @@
+name = "RDKafka"
+uuid = "43e2f499-f4c7-585f-8317-cbc2d9c3bf8f"
+authors = ["Andrei Zhabinski <[email protected]>"]
+version = "0.1.0"
+
+[deps]
+librdkafka_jll = "7943bfb0-7437-5acd-a008-22777931c7aa"
+
+[compat]
+julia = "1.3"
+librdkafka_jll = "1.5"
+
+[extras]
+Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
+
+[targets]
+test = ["Test"]

+ 107 - 0
README.md

@@ -0,0 +1,107 @@
+# RDKafka
+
+Julia wrapper for [librdkafka](https://github.com/edenhill/librdkafka).
+
+## Build
+
+```julia
+using Pkg
+Pkg.add("RDKafka")
+```
+
+Prebuilt binaries of the `librdkafka` native library is downloaded. The binaries are available for all supported Julia platforms. 
+
+## Usage
+
+### Start Kafka server
+
+If you don't have one already started, download Kafka server and run it according to the official [QuickStart Guide](https://kafka.apache.org/quickstart). Here's a short versioin of that guide:
+
+`cd` to the kafka folder and in run the following commands in 2 different terminals:
+
+```
+# start ZooKeeper server
+bin/zookeeper-server-start.sh config/zookeeper.properties
+# start Kafka broker
+bin/kafka-server-start.sh config/server.properties
+```
+
+In yet another terminal create a topic:
+
+```
+# create topic
+bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
+# describe it
+bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
+```
+
+### Produce and consume some messages
+
+Now in Julia console start polling using `KafkaConsumer`:
+
+```julia
+using RDKafka
+
+c = KafkaConsumer("localhost:9092", "my-consumer-group")
+parlist = [("quickstart-events", 0)]
+subscribe(c, parlist)
+timeout_ms = 1000
+while true
+    msg = poll(String, String, c, timeout_ms)
+    @show(msg)
+end
+```
+
+And procude a few messages using `KafkaProducer`
+
+```julia
+using RDKafka
+import RDKafka.produce
+
+p = KafkaProducer("localhost:9092")
+partition = 0
+produce(p, "quickstart-events", partition, "message key", "message payload")
+```
+
+In the consumer window you should see something like:
+
+```
+msg = nothing
+msg = Message(message key: message payload)
+msg = nothing
+msg = nothing
+```
+where `nothing` means that there were no new messages in that polling interval while `Message(...)` is actual message we sent from producer.
+
+### Configuration
+
+`librdkafka` is highly customizable, see [CONFIGURATION.md](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for the list of supported properties. To set a particular property, pass a `conf` object to `KafkaProducer` or `KafkaConsumer`, e.g.:
+
+```julia
+conf = Dict("socket.timeout.ms" => 300000)
+p = KafkaProducer("localhost:9092", conf)
+```
+
+### Error handling
+
+Add the `err_cb` argument to either KafkaConsumer or KafkaProducer.
+
+```
+c = KafkaConsumer("localhost:9092", "my-consumer-group", err_cb=(err::Int, reason::String) -> throw(error(reason)))
+```
+
+### Delivery reports
+
+Add the `dr_cb` argument to a KafkaProducer.
+
+```
+p = KafkaProducer("localhost:9092", dr_cb=msg -> if msg.err != 0 throw(error("Delivery failed") end))
+```
+
+### Seeking
+
+```
+c = KafkaConsumer("localhost:9092", "my-consumer-group")
+RDKafka.seek(c, timestamp_ms, timeout_ms)
+```
+

+ 1 - 0
REQUIRE

@@ -0,0 +1 @@
+julia 0.6

+ 45 - 0
appveyor.yml

@@ -0,0 +1,45 @@
+environment:
+  matrix:
+  - JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x86/julia-latest-win32.exe"
+  - JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x64/julia-latest-win64.exe"
+
+## uncomment the following lines to allow failures on nightly julia
+## (tests will run but not make your overall status red)
+#matrix:
+#  allow_failures:
+#  - JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x86/julia-latest-win32.exe"
+#  - JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x64/julia-latest-win64.exe"
+
+branches:
+  only:
+    - master
+    - /release-.*/
+
+notifications:
+  - provider: Email
+    on_build_success: false
+    on_build_failure: false
+    on_build_status_changed: false
+
+install:
+  - ps: "[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.SecurityProtocolType]::Tls12"
+# If there's a newer build queued for the same PR, cancel this one
+  - ps: if ($env:APPVEYOR_PULL_REQUEST_NUMBER -and $env:APPVEYOR_BUILD_NUMBER -ne ((Invoke-RestMethod `
+        https://ci.appveyor.com/api/projects/$env:APPVEYOR_ACCOUNT_NAME/$env:APPVEYOR_PROJECT_SLUG/history?recordsNumber=50).builds | `
+        Where-Object pullRequestId -eq $env:APPVEYOR_PULL_REQUEST_NUMBER)[0].buildNumber) { `
+        throw "There are newer queued builds for this pull request, failing early." }
+# Download most recent Julia Windows binary
+  - ps: (new-object net.webclient).DownloadFile(
+        $env:JULIA_URL,
+        "C:\projects\julia-binary.exe")
+# Run installer silently, output to C:\projects\julia
+  - C:\projects\julia-binary.exe /S /D=C:\projects\julia
+
+build_script:
+# Need to convert from shallow to complete for Pkg.clone to work
+  - IF EXIST .git\shallow (git fetch --unshallow)
+  - C:\projects\julia\bin\julia -e "versioninfo();
+      Pkg.clone(pwd(), \"RDKafka\"); Pkg.build(\"RDKafka\")"
+
+test_script:
+  - C:\projects\julia\bin\julia -e "Pkg.test(\"RDKafka\")"

+ 83 - 0
docker-compose.yml

@@ -0,0 +1,83 @@
+---
+version: '2'
+services:
+  zookeeper-1:
+    image: confluentinc/cp-zookeeper:latest
+    environment:
+      ZOOKEEPER_SERVER_ID: 1
+      ZOOKEEPER_CLIENT_PORT: 22181
+      ZOOKEEPER_TICK_TIME: 2000
+      ZOOKEEPER_INIT_LIMIT: 5
+      ZOOKEEPER_SYNC_LIMIT: 2
+      ZOOKEEPER_SERVERS: localhost:22888:23888;localhost:32888:33888;localhost:42888:43888
+    network_mode: host
+    extra_hosts:
+      - "moby:127.0.0.1"
+
+  zookeeper-2:
+    image: confluentinc/cp-zookeeper:latest
+    environment:
+      ZOOKEEPER_SERVER_ID: 2
+      ZOOKEEPER_CLIENT_PORT: 32181
+      ZOOKEEPER_TICK_TIME: 2000
+      ZOOKEEPER_INIT_LIMIT: 5
+      ZOOKEEPER_SYNC_LIMIT: 2
+      ZOOKEEPER_SERVERS: localhost:22888:23888;localhost:32888:33888;localhost:42888:43888
+    network_mode: host
+    extra_hosts:
+      - "moby:127.0.0.1"
+
+  zookeeper-3:
+    image: confluentinc/cp-zookeeper:latest
+    environment:
+      ZOOKEEPER_SERVER_ID: 3
+      ZOOKEEPER_CLIENT_PORT: 42181
+      ZOOKEEPER_TICK_TIME: 2000
+      ZOOKEEPER_INIT_LIMIT: 5
+      ZOOKEEPER_SYNC_LIMIT: 2
+      ZOOKEEPER_SERVERS: localhost:22888:23888;localhost:32888:33888;localhost:42888:43888
+    network_mode: host
+    extra_hosts:
+      - "moby:127.0.0.1"
+
+  kafka-1:
+    image: confluentinc/cp-kafka:latest
+    network_mode: host
+    depends_on:
+      - zookeeper-1
+      - zookeeper-2
+      - zookeeper-3
+    environment:
+      KAFKA_BROKER_ID: 1
+      KAFKA_ZOOKEEPER_CONNECT: localhost:22181,localhost:32181,localhost:42181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:19092
+    extra_hosts:
+      - "moby:127.0.0.1"
+
+  kafka-2:
+    image: confluentinc/cp-kafka:latest
+    network_mode: host
+    depends_on:
+      - zookeeper-1
+      - zookeeper-2
+      - zookeeper-3
+    environment:
+      KAFKA_BROKER_ID: 2
+      KAFKA_ZOOKEEPER_CONNECT: localhost:22181,localhost:32181,localhost:42181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
+    extra_hosts:
+      - "moby:127.0.0.1"
+
+  kafka-3:
+    image: confluentinc/cp-kafka:latest
+    network_mode: host
+    depends_on:
+      - zookeeper-1
+      - zookeeper-2
+      - zookeeper-3
+    environment:
+      KAFKA_BROKER_ID: 3
+      KAFKA_ZOOKEEPER_CONNECT: localhost:22181,localhost:32181,localhost:42181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:39092
+    extra_hosts:
+      - "moby:127.0.0.1"

+ 14 - 0
src/RDKafka.jl

@@ -0,0 +1,14 @@
+module RDKafka
+using librdkafka_jll
+
+export KafkaProducer,
+    KafkaConsumer,
+    KafkaClient,
+    produce,
+    subscribe,
+    seek,
+    poll
+
+include("core.jl")
+
+end # module

+ 117 - 0
src/client.jl

@@ -0,0 +1,117 @@
+################################################################################
+##                                 CALLBACKS                                  ##
+################################################################################
+
+# librdkafka provides several callback (CB) functions such as delivery report CB and error CB
+# we defne fixed @cfunction callbacks to pass them during compilation
+# these @cfunction's then call corresponding Julia functions from XXX_CALLBACKS dictionaries
+const DELIVERY_CALLBACKS = Dict{Ptr{Cvoid}, Function}()
+function delivery_callback(rk::Ptr{Cvoid}, rkmessage::Ptr{CKafkaMessage}, opaque::Ptr{Cvoid})::Cvoid
+    msg = Message(unsafe_load(rkmessage))
+    if haskey(DELIVERY_CALLBACKS, rk)
+        cb = DELIVERY_CALLBACKS[rk]
+        cb(msg)
+    end
+end
+
+
+# error callbacks are called with the arguments (err::Int, reason::String)
+function error_callback(rk::Ptr{Cvoid}, err::Cint, reason::Ptr{Cchar}, opaque::Ptr{Cvoid})::Cvoid
+    reason = unsafe_string(reason)
+    if haskey(ERROR_CALLBACKS, rk)
+        cb = ERROR_CALLBACKS[rk]
+        cb(err, reason)
+    end
+end
+const ERROR_CALLBACKS = Dict{Ptr, Function}()
+
+
+mutable struct KafkaClient
+    conf::Dict{Any, Any}
+    typ::Cint
+    rk::Ptr{Cvoid}
+end
+
+
+function KafkaClient(typ::Integer, conf::Dict=Dict(); dr_cb=nothing, err_cb=nothing)
+    c_conf = kafka_conf_new()
+    for (k, v) in conf
+        kafka_conf_set(c_conf, string(k), string(v))
+    end
+    # set callbacks in config before creating rk
+    if dr_cb != nothing
+        kafka_conf_set_dr_msg_cb(c_conf, @cfunction(delivery_callback, Cvoid, (Ptr{Cvoid}, Ptr{CKafkaMessage}, Ptr{Cvoid})))
+    end
+    if err_cb != nothing
+        kafka_conf_set_error_cb(c_conf, @cfunction(error_callback, Cvoid, (Ptr{Cvoid}, Cint, Ptr{Cchar}, Ptr{Cvoid})))
+    end
+    rk = kafka_new(c_conf, Cint(typ))
+    client = KafkaClient(conf, typ, rk)
+    polling = dr_cb != nothing || err_cb != nothing
+    # seems like `kafka_destroy` also destroys its config, so we don't attempt it twice
+    finalizer(client -> begin 
+        polling = false
+        kafka_destroy(rk)
+    end, client)
+    if dr_cb != nothing
+        # set Julia callback after rk is created
+        DELIVERY_CALLBACKS[rk] = dr_cb
+    end
+    if err_cb != nothing
+        ERROR_CALLBACKS[rk] = err_cb
+    end
+    @async while polling
+        kafka_poll(rk, 0)
+        sleep(1)
+    end
+    return client
+end
+
+
+Base.show(io::IO, kc::KafkaClient) = print(io, "KafkaClient($(kc.typ))")
+
+
+mutable struct KafkaTopic
+    conf::Dict{Any, Any}
+    topic::String
+    rkt::Ptr{Cvoid}
+end
+
+function KafkaTopic(kc::KafkaClient, topic::String, conf::Dict=Dict())
+    c_conf = kafka_topic_conf_new()
+    for (k, v) in conf
+        kafka_topic_conf_set(c_conf, string(k), string(v))
+    end
+    rkt = kafka_topic_new(kc.rk, topic, c_conf)
+    topic = KafkaTopic(conf, topic, rkt)
+    finalizer(topic -> (kafka_topic_destroy(rkt)), topic)
+    return topic
+end
+
+Base.show(io::IO, kt::KafkaTopic) = print(io, "KafkaTopic($(kt.topic))")
+
+
+struct Message{K,P}
+    err::Int
+    topic::KafkaTopic
+    partition::Int32
+    key::Union{K, Cvoid}
+    payload::Union{P, Cvoid}
+    offset::Int64
+end
+
+Base.convert(::Type{String}, data::Vector{UInt8}) = String(data)
+
+function Message{K,P}(c_msg::CKafkaMessage) where {K,P}
+    topic = KafkaTopic(Dict(), "<unkown>", c_msg.rkt)
+    if c_msg.err == 0
+        key = convert(K, unsafe_load_array(c_msg.key, c_msg.key_len))
+        payload = convert(P, unsafe_load_array(c_msg.payload, c_msg.len))
+        return Message(Int(c_msg.err), topic, c_msg.partition, key, payload, c_msg.offset)
+    else
+        return Message{K,P}(Int(c_msg.err), topic, c_msg.partition, nothing, nothing, c_msg.offset)
+    end
+end
+Message(c_msg::CKafkaMessage) = Message{Vector{UInt8}, Vector{UInt8}}(c_msg)
+
+Base.show(io::IO, msg::Message) = print(io, "Message($(msg.key): $(msg.payload))")

+ 90 - 0
src/consumer.jl

@@ -0,0 +1,90 @@
+## consumer
+
+mutable struct PartitionList
+    rkparlist::Ptr{CKafkaTopicPartitionList}
+end
+
+function PartitionList()
+    ptr = kafka_topic_partition_list_new()
+    parlist = PartitionList(ptr)
+    finalizer(parlist -> kafka_topic_partition_list_destroy(ptr), parlist)
+    return parlist
+end
+
+
+mutable struct KafkaConsumer
+    client::KafkaClient
+    parlist::PartitionList
+end
+
+
+function KafkaConsumer(conf::Dict; dr_cb=nothing, err_cb=nothing)
+    @assert haskey(conf, "bootstrap.servers") "`bootstrap.servers` should be specified in conf"
+    @assert haskey(conf, "group.id") "`group.id` should be specified in conf"
+    client = KafkaClient(KAFKA_TYPE_CONSUMER, conf; dr_cb=dr_cb, err_cb=err_cb)
+    parlist = PartitionList()
+    return KafkaConsumer(client, parlist)
+end
+
+
+function KafkaConsumer(bootstrap_servers::String, group_id::String, conf::Dict=Dict(); dr_cb=nothing, err_cb=nothing)
+    conf["bootstrap.servers"] = bootstrap_servers
+    conf["group.id"] = group_id
+    return KafkaConsumer(conf; dr_cb=dr_cb, err_cb=err_cb)
+end
+
+
+function Base.show(io::IO, c::KafkaConsumer)
+    group_id = c.client.conf["group.id"]
+    bootstrap_servers = c.client.conf["bootstrap.servers"]
+    print(io, "KafkaConsumer($group_id @ $bootstrap_servers)")
+end
+
+
+function subscribe(c::KafkaConsumer, topic_partitions::Vector{Tuple{String, Int}})
+    rkparlist = c.parlist.rkparlist
+    for (topic, partition) in topic_partitions
+        kafka_topic_partition_list_add(rkparlist, topic, partition)
+    end
+    kafka_subscribe(c.client.rk, rkparlist)
+end
+
+function offsets_for_timestamp(c::KafkaConsumer, timestamp::Int64, timeout::Int=1000)
+    rkparlist = c.parlist.rkparlist
+    kafka_assignment(c.client.rk, rkparlist)
+    tpl = unsafe_load(rkparlist)::CKafkaTopicPartitionList
+    for i in 1:tpl.cnt
+        e = unsafe_load(tpl.elems, i)
+        e.offset = timestamp
+        unsafe_store!(tpl.elems, e, i)
+    end
+    kafka_offsets_for_times(c.client.rk, rkparlist, timeout)
+end
+
+function seek(c::KafkaConsumer, timestamp::Int64, timeout::Int=1000)
+    offsets_for_timestamp(c, timestamp, timeout)
+    rkparlist = c.parlist.rkparlist
+    kafka_assign(c.client.rk, rkparlist)
+    tpl = unsafe_load(rkparlist)::CKafkaTopicPartitionList
+    for i in 1:tpl.cnt
+        e = unsafe_load(tpl.elems, i)
+        topic = unsafe_string(e.topic)
+        kt = KafkaTopic(c.client, topic, Dict())
+        kafka_seek(kt.rkt, e.partition, e.offset, timeout)
+    end
+end
+
+function poll(::Type{K}, ::Type{P}, c::KafkaConsumer, timeout::Int=1000) where {K,P}
+    c_msg_ptr = kafka_consumer_poll(c.client.rk, timeout)
+    if c_msg_ptr != nothing
+        c_msg = unsafe_load(c_msg_ptr)
+        msg = Message{K,P}(c_msg)
+        kafka_message_destroy(c_msg_ptr)
+        return msg
+    else
+        return nothing
+    end
+end
+
+
+poll(c::KafkaConsumer, timeout::Int=1000) = poll(Vector{UInt8}, Vector{UInt8}, c, timeout)

+ 5 - 0
src/core.jl

@@ -0,0 +1,5 @@
+include("utils.jl")
+include("wrapper.jl")
+include("client.jl")
+include("producer.jl")
+include("consumer.jl")

+ 43 - 0
src/producer.jl

@@ -0,0 +1,43 @@
+## KafkaProducer
+
+mutable struct KafkaProducer
+    client::KafkaClient
+    topics::Dict{String, KafkaTopic}
+end
+
+
+function KafkaProducer(conf::Dict; dr_cb=nothing, err_cb=nothing)
+    kc = KafkaClient(KAFKA_TYPE_PRODUCER, conf; dr_cb=dr_cb, err_cb=err_cb)
+    return KafkaProducer(kc, Dict())
+end
+
+
+function KafkaProducer(bootstrap_servers::String, conf::Dict=Dict(); dr_cb=nothing, err_cb=nothing)
+    conf["bootstrap.servers"] = bootstrap_servers
+    return KafkaProducer(conf; dr_cb=dr_cb, err_cb=err_cb)
+end
+
+function Base.show(io::IO, p::KafkaProducer)
+    bootstrap_servers = p.client.conf["bootstrap.servers"]
+    print(io, "KafkaProducer($bootstrap_servers)")
+end
+
+
+function produce(kt::KafkaTopic, partition::Integer, key, payload)
+    # produce(kt.rkt, partition, convert(Vector{UInt8}, key), convert(Vector{UInt8}, payload))
+    produce(kt.rkt, partition, Vector{UInt8}(key), Vector{UInt8}(payload))
+end
+
+
+function produce(p::KafkaProducer, topic::String, partition::Integer, key, payload)
+    if !haskey(p.topics, topic)
+        p.topics[topic] = KafkaTopic(p.client, topic, Dict())
+    end
+    produce(p.topics[topic], partition, key, payload)
+end
+
+
+function produce(p::KafkaProducer, topic::String, key, payload)
+    partition_unassigned = -1
+    produce(p, topic, partition_unassigned, key, payload)
+end

+ 9 - 0
src/utils.jl

@@ -0,0 +1,9 @@
+
+function unsafe_load_array(p::Ptr{T}, len::Integer) where T
+    @assert T != Nothing "Cannot load array from Ptr{Void}, perhaps you meant to use a typed pointer?"
+    a = Array{T}(undef, len)
+    for i in 1:len
+        a[i] = unsafe_load(p, i)
+    end
+    return a
+end

+ 264 - 0
src/wrapper.jl

@@ -0,0 +1,264 @@
+## wrapper for librdkafka C API, see:
+## https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h
+
+function rd_kafka_version()
+    ccall((:rd_kafka_version, RDKafka.librdkafka), Cint, ())
+end
+
+## rd_kafka_conf_t
+
+function kafka_conf_new()
+    return ccall((:rd_kafka_conf_new, librdkafka), Ptr{Cvoid}, ())
+end
+
+function kafka_conf_destroy(conf::Ptr{Cvoid})
+    ccall((:rd_kafka_conf_destroy, librdkafka), Cvoid, (Ptr{Cvoid},), conf)
+end
+
+
+function kafka_conf_set(conf::Ptr{Cvoid}, key::String, val::String)
+    err_str = zeros(UInt8, 512)
+    return ccall((:rd_kafka_conf_set, librdkafka), Cvoid,
+                 (Ptr{Cvoid}, Cstring, Cstring, Ptr{UInt8}, Csize_t),
+                 conf, key, val, pointer(err_str), sizeof(err_str))
+end
+
+
+function kafka_conf_get(conf::Ptr{Cvoid}, key::String)
+    dest = zeros(UInt8, 512)
+    sz_ref = Ref{Csize_t}(0)
+    ccall((:rd_kafka_conf_get, librdkafka), Cvoid,
+          (Ptr{Cvoid}, Cstring, Ptr{UInt8}, Ptr{Csize_t}),
+          conf, key, pointer(dest), sz_ref)
+    return unsafe_string(pointer(dest))
+end
+
+
+function kafka_conf_set_error_cb(conf::Ptr{Cvoid}, c_fn::Ptr{Cvoid})
+    ccall((:rd_kafka_conf_set_error_cb, librdkafka), Cvoid,
+          (Ptr{Cvoid}, Ptr{Cvoid}), conf, c_fn)
+end
+
+function kafka_conf_set_dr_msg_cb(conf::Ptr{Cvoid}, c_fn::Ptr{Cvoid})
+    ccall((:rd_kafka_conf_set_dr_msg_cb, librdkafka), Cvoid,
+          (Ptr{Cvoid}, Ptr{Cvoid}), conf, c_fn)
+end
+
+
+## rd_kafka_t
+
+const KAFKA_TYPE_PRODUCER = Cint(0)
+const KAFKA_TYPE_CONSUMER = Cint(1)
+
+
+function kafka_new(conf::Ptr{Cvoid}, kafka_type::Cint)
+    err_str = zeros(UInt8, 512)
+    client = ccall((:rd_kafka_new, librdkafka),
+                   Ptr{Cvoid},
+                   (Cint, Ptr{Cvoid}, Ptr{UInt8}, Csize_t),
+                   kafka_type, conf, pointer(err_str), sizeof(err_str))
+    return client
+end
+
+
+function kafka_destroy(rk::Ptr{Cvoid})
+    ccall((:rd_kafka_destroy, librdkafka), Cvoid, (Ptr{Cvoid},), rk)
+end
+
+
+## rd_kafka_topic_conf_t
+
+function kafka_topic_conf_new()
+    return ccall((:rd_kafka_topic_conf_new, librdkafka), Ptr{Cvoid}, ())
+end
+
+
+function kafka_topic_conf_destroy(conf::Ptr{Cvoid})
+    ccall((:rd_kafka_topic_conf_destroy, librdkafka), Cvoid, (Ptr{Cvoid},), conf)
+end
+
+
+function kafka_topic_conf_set(conf::Ptr{Cvoid}, key::String, val::String)
+    err_str = Array{UInt8}(undef, 512)
+    return ccall((:rd_kafka_topic_conf_set, librdkafka), Cvoid,
+                 (Ptr{Cvoid}, Cstring, Cstring, Ptr{UInt8}, Csize_t),
+                 conf, key, val, pointer(err_str), sizeof(err_str))
+end
+
+
+function kafka_topic_conf_get(conf::Ptr{Cvoid}, key::String)
+    dest = Array{UInt8}(undef, 512)
+    sz_ref = Ref{Csize_t}(0)
+    ccall((:rd_kafka_topic_conf_get, librdkafka), Cvoid,
+          (Ptr{Cvoid}, Cstring, Ptr{UInt8}, Ptr{Csize_t}),
+          conf, key, pointer(dest), sz_ref)
+    return unsafe_string(pointer(dest))
+end
+
+
+# rd_kafka_topic_t
+
+function kafka_topic_new(rk::Ptr{Cvoid}, topic::String, topic_conf::Ptr{Cvoid})
+    return ccall((:rd_kafka_topic_new, librdkafka), Ptr{Cvoid},
+                 (Ptr{Cvoid}, Cstring, Ptr{Cvoid}),
+                 rk, topic, topic_conf)
+end
+
+
+function kafka_topic_destroy(rkt::Ptr{Cvoid})
+    ccall((:rd_kafka_topic_destroy, librdkafka), Cvoid, (Ptr{Cvoid},), rkt)
+end
+
+
+function kafka_poll(rk::Ptr{Cvoid}, timeout::Integer)
+    return ccall((:rd_kafka_poll, librdkafka), Cint,
+                 (Ptr{Cvoid}, Cint),
+                 rk, timeout)
+    
+end
+
+
+function produce(rkt::Ptr{Cvoid}, partition::Integer,
+                 key::Vector{UInt8}, payload::Vector{UInt8})
+    flags = Cint(0)
+    errcode = ccall((:rd_kafka_produce, librdkafka), Cint,
+                    (Ptr{Cvoid}, Int32, Cint,
+                     Ptr{Cvoid}, Csize_t,
+                     Ptr{Cvoid}, Csize_t,
+                     Ptr{Cvoid}),
+                    rkt, Int32(partition), flags,
+                    pointer(payload), length(payload),
+                    pointer(key), length(key),
+                    C_NULL)   
+
+    if errcode != 0
+        #error("Produce request failed with error code (unix): $errcode")
+        ## errno is c global var
+    	errnr = unsafe_load(cglobal(:errno, Int32))
+        errmsg = unsafe_string(ccall(:strerror, Cstring, (Int32,), errnr))
+        error("Produce request failed with error code: $errmsg")  
+    end
+end
+
+
+
+## topic partition list
+
+mutable struct CKafkaTopicPartition
+    topic::Cstring
+    partition::Cint
+    offset::Clonglong
+    metadata::Ptr{Cvoid}
+    metadata_size::Csize_t
+    opaque::Ptr{Cvoid}
+    err::Cint
+    _private::Ptr{Cvoid}
+end
+mutable struct CKafkaTopicPartitionList
+    cnt::Cint
+    size::Cint
+    elems::Ptr{CKafkaTopicPartition}
+end
+
+function kafka_topic_partition_list_new(sz::Integer=0)
+    rkparlist = ccall((:rd_kafka_topic_partition_list_new, librdkafka), Ptr{CKafkaTopicPartitionList},
+                       (Cint,), sz)
+    if rkparlist != Ptr{CKafkaTopicPartitionList}(0)
+        return rkparlist
+    end
+    return nothing
+end
+
+
+function kafka_topic_partition_list_destroy(rkparlist::Ptr{CKafkaTopicPartitionList})
+    ccall((:rd_kafka_topic_partition_list_destroy, librdkafka), Cvoid, (Ptr{CKafkaTopicPartitionList},), rkparlist)
+end
+
+
+function kafka_topic_partition_list_add(rkparlist::Ptr{CKafkaTopicPartitionList},
+                                        topic::String, partition::Integer)
+    ccall((:rd_kafka_topic_partition_list_add, librdkafka), Ptr{CKafkaTopicPartition},
+          (Ptr{CKafkaTopicPartitionList}, Cstring, Int32,), rkparlist, topic, partition)
+end
+
+
+## partition assignment
+
+function kafka_assignment(rk::Ptr{Cvoid}, rkparlist::Ptr{CKafkaTopicPartitionList})
+    errcode = ccall((:rd_kafka_assignment, librdkafka), Cint,
+                    (Ptr{Cvoid}, Ref{Ptr{CKafkaTopicPartitionList}}), rk, rkparlist)
+    if errcode != 0
+        error("Assignment retrieval failed with error $errcode")
+    end
+end
+    
+
+## subscribe
+
+function kafka_subscribe(rk::Ptr{Cvoid}, rkparlist::Ptr{CKafkaTopicPartitionList})
+    errcode = ccall((:rd_kafka_subscribe, librdkafka), Cint,
+                    (Ptr{Cvoid}, Ptr{Cvoid}), rk, rkparlist)
+    if errcode != 0
+        error("Subscription failed with error $errcode")
+    end
+end
+
+
+## seek
+
+function kafka_offsets_for_times(rk::Ptr{Cvoid}, rkparlist::Ptr{CKafkaTopicPartitionList}, timeout::Integer)
+    errcode = ccall((:rd_kafka_offsets_for_times, librdkafka), Cint,
+                    (Ptr{Cvoid}, Ptr{CKafkaTopicPartitionList}, Cint), rk, rkparlist, timeout)
+    if errcode != 0
+        error("kafka_offsets_for_times failed with error $errcode")
+    end
+end
+
+function kafka_seek(rkt::Ptr{Cvoid}, partition::Int32, offset::Int64, timeout::Int=1000)
+    errcode = ccall((:rd_kafka_seek, librdkafka), Cint,
+                    (Ptr{Cvoid}, Cint, Clonglong, Cint), 
+                    rkt, partition, offset, timeout)
+    if errcode != 0
+        error("kafka_seek failed with error $errcode")
+    end
+end
+
+function kafka_assign(rk::Ptr{Cvoid}, rkparlist::Ptr{CKafkaTopicPartitionList})
+    errcode = ccall((:rd_kafka_assign, librdkafka), Cint,
+                    (Ptr{Cvoid}, Ptr{CKafkaTopicPartitionList}), 
+                    rk, rkparlist)
+    if errcode != 0
+        error("kafka_assign failed with error $errcode")
+    end
+end
+
+
+struct CKafkaMessage
+    err::Cint
+    rkt::Ptr{Cvoid}
+    partition::Int32
+    payload::Ptr{UInt8}
+    len::Csize_t
+    key::Ptr{UInt8}
+    key_len::Csize_t
+    offset::Int64
+    _private::Ptr{Cvoid}
+end
+
+
+function kafka_consumer_poll(rk::Ptr{Cvoid}, timeout::Integer)
+    msg_ptr = ccall((:rd_kafka_consumer_poll, librdkafka), Ptr{CKafkaMessage},
+                    (Ptr{Cvoid}, Cint), rk, timeout)
+    if msg_ptr != Ptr{CKafkaMessage}(0)
+        return msg_ptr
+    else
+        return nothing
+    end
+        
+end
+
+
+function kafka_message_destroy(msg_ptr::Ptr{CKafkaMessage})
+    ccall((:rd_kafka_message_destroy, librdkafka), Cvoid, (Ptr{Cvoid},), msg_ptr)
+end
+5

+ 218 - 0
src/wrapper_consts.jl

@@ -0,0 +1,218 @@
+#  Internal errors to rdkafka:
+#  Begin internal error codes
+const RD_KAFKA_RESP_ERR__BEGIN = -200
+#  Received message is incorrect
+const RD_KAFKA_RESP_ERR__BAD_MSG = -199
+#  Bad/unknown compression
+const RD_KAFKA_RESP_ERR__BAD_COMPRESSION = -198
+#  Broker is going away
+const RD_KAFKA_RESP_ERR__DESTROY = -197
+#  Generic failure
+const RD_KAFKA_RESP_ERR__FAIL = -196
+#  Broker transport failure
+const RD_KAFKA_RESP_ERR__TRANSPORT = -195
+#  Critical system resource
+const RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE = -194
+#  Failed to resolve broker
+const RD_KAFKA_RESP_ERR__RESOLVE = -193
+#  Produced message timed out*/
+const RD_KAFKA_RESP_ERR__MSG_TIMED_OUT = -192
+#  Reached the end of the topic+partition queue on
+#  the broker. Not really an error.
+const RD_KAFKA_RESP_ERR__PARTITION_EOF = -191
+#  Permanent: Partition does not exist in cluster.
+const RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION = -190
+#  File or filesystem error
+const RD_KAFKA_RESP_ERR__FS = -189
+#  Permanent: Topic does not exist in cluster.
+const RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC = -188
+#  All broker connections are down.
+const RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN = -187
+#  Invalid argument, or invalid configuration
+const RD_KAFKA_RESP_ERR__INVALID_ARG = -186
+#  Operation timed out
+const RD_KAFKA_RESP_ERR__TIMED_OUT = -185
+#  Queue is full
+const RD_KAFKA_RESP_ERR__QUEUE_FULL = -184
+#  ISR count < required.acks
+const RD_KAFKA_RESP_ERR__ISR_INSUFF = -183
+#  Broker node update
+const RD_KAFKA_RESP_ERR__NODE_UPDATE = -182
+#  SSL error
+const RD_KAFKA_RESP_ERR__SSL = -181
+#  Waiting for coordinator to become available.
+const RD_KAFKA_RESP_ERR__WAIT_COORD = -180
+#  Unknown client group
+const RD_KAFKA_RESP_ERR__UNKNOWN_GROUP = -179
+#  Operation in progress
+const RD_KAFKA_RESP_ERR__IN_PROGRESS = -178
+#  Previous operation in progress, wait for it to finish.
+const RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS = -177
+#  This operation would interfere with an existing subscription
+const RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION = -176
+#  Assigned partitions (rebalance_cb)
+const RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS = -175
+#  Revoked partitions (rebalance_cb)
+const RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS = -174
+#  Conflicting use
+const RD_KAFKA_RESP_ERR__CONFLICT = -173
+#  Wrong state
+const RD_KAFKA_RESP_ERR__STATE = -172
+#  Unknown protocol
+const RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL = -171
+#  Not implemented
+const RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED = -170
+#  Authentication failure*/
+const RD_KAFKA_RESP_ERR__AUTHENTICATION = -169
+#  No stored offset
+const RD_KAFKA_RESP_ERR__NO_OFFSET = -168
+#  Outdated
+const RD_KAFKA_RESP_ERR__OUTDATED = -167
+#  Timed out in queue
+const RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE = -166
+#  Feature not supported by broker
+const RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE = -165
+#  Awaiting cache update
+const RD_KAFKA_RESP_ERR__WAIT_CACHE = -164
+#  Operation interrupted (e.g., due to yield))
+const RD_KAFKA_RESP_ERR__INTR = -163
+#  Key serialization error
+const RD_KAFKA_RESP_ERR__KEY_SERIALIZATION = -162
+#  Value serialization error
+const RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION = -161
+#  Key deserialization error
+const RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION = -160
+#  Value deserialization error
+const RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION = -159
+#  Partial response
+const RD_KAFKA_RESP_ERR__PARTIAL = -158
+#  Modification attempted on read-only object
+const RD_KAFKA_RESP_ERR__READ_ONLY = -157
+#  No such entry / item not found
+const RD_KAFKA_RESP_ERR__NOENT = -156
+#  Read underflow
+const RD_KAFKA_RESP_ERR__UNDERFLOW = -155
+
+#  End internal error codes
+const RD_KAFKA_RESP_ERR__END = -100
+
+##  Kafka broker errors:
+#  Unknown broker error
+const RD_KAFKA_RESP_ERR_UNKNOWN = -1
+#  Success
+const RD_KAFKA_RESP_ERR_NO_ERROR = 0
+#  Offset out of range
+const RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE = 1
+#  Invalid message
+const RD_KAFKA_RESP_ERR_INVALID_MSG = 2
+#  Unknown topic or partition
+const RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART = 3
+#  Invalid message size
+const RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE = 4
+#  Leader not available
+const RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE = 5
+#  Not leader for partition
+const RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION = 6
+#  Request timed out
+const RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT = 7
+#  Broker not available
+const RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE = 8
+#  Replica not available
+const RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE = 9
+#  Message size too large
+const RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE = 10
+#  StaleControllerEpochCode
+const RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH = 11
+#  Offset metadata string too large
+const RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE = 12
+#  Broker disconnected before response received
+const RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION = 13
+#  Group coordinator load in progress
+const RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS = 14
+#  Group coordinator not available
+const RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE = 15
+#  Not coordinator for group
+const RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP = 16
+#  Invalid topic
+const RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION = 17
+#  Message batch larger than configured server segment size
+const RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE = 18
+#  Not enough in-sync replicas
+const RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS = 19
+#  Message(s) written to insufficient number of in-sync replicas
+const RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20
+#  Invalid required acks value
+const RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS = 21
+#  Specified group generation id is not valid
+const RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION = 22
+#  Inconsistent group protocol
+const RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL = 23
+#  Invalid group.id
+const RD_KAFKA_RESP_ERR_INVALID_GROUP_ID = 24
+#  Unknown member
+const RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID = 25
+#  Invalid session timeout
+const RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT = 26
+#  Group rebalance in progress
+const RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS = 27
+#  Commit offset data size is not valid
+const RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE = 28
+#  Topic authorization failed
+const RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED = 29
+#  Group authorization failed
+const RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED = 30
+#  Cluster authorization failed
+const RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED = 31
+#  Invalid timestamp
+const RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP = 32
+#  Unsupported SASL mechanism
+const RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM = 33
+#  Illegal SASL state
+const RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE = 34
+#  Unuspported version
+const RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION = 35
+#  Topic already exists
+const RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS = 36
+#  Invalid number of partitions
+const RD_KAFKA_RESP_ERR_INVALID_PARTITIONS = 37
+#  Invalid replication factor
+const RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR = 38
+#  Invalid replica assignment
+const RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT = 39
+#  Invalid config
+const RD_KAFKA_RESP_ERR_INVALID_CONFIG = 40
+#  Not controller for cluster
+const RD_KAFKA_RESP_ERR_NOT_CONTROLLER = 41
+#  Invalid request
+const RD_KAFKA_RESP_ERR_INVALID_REQUEST = 42
+#  Message format on broker does not support request
+const RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43
+#  Isolation policy volation
+const RD_KAFKA_RESP_ERR_POLICY_VIOLATION = 44
+#  Broker received an out of order sequence number
+const RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45
+#  Broker received a duplicate sequence number
+const RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER = 46
+#  Producer attempted an operation with an old epoch
+const RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH = 47
+#  Producer attempted a transactional operation in an invalid state
+const RD_KAFKA_RESP_ERR_INVALID_TXN_STATE = 48
+#  Producer attempted to use a producer id which is not
+#  currently assigned to its transactional id
+const RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING = 49
+#  Transaction timeout is larger than the maximum
+#  value allowed by the broker's max.transaction.timeout.ms
+const RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT = 50
+#  Producer attempted to update a transaction while another
+#  concurrent operation on the same transaction was ongoing
+const RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS = 51
+#  Indicates that the transaction coordinator sending a
+#  WriteTxnMarker is no longer the current coordinator for a
+#  given producer
+const RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED = 52
+#  Transactional Id authorization failed
+const RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53
+#  Security features are disabled
+const RD_KAFKA_RESP_ERR_SECURITY_DISABLED = 54
+#  Operation not attempted
+const RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED = 55

+ 30 - 0
test/runtests.jl

@@ -0,0 +1,30 @@
+using RDKafka
+using Test
+
+a = RDKafka.rd_kafka_version() # 17105151 or 0x010500ff for 1.0.5
+# verify the major and minor. Any change to these should be reflected
+# by a change in Project.toml
+@test (a & 0x01000000) == 0x01000000
+@test (a & 0x00050000) == 0x00050000
+
+
+conf =  RDKafka.kafka_conf_new()
+@test RDKafka.kafka_conf_get(conf, "socket.keepalive.enable") == "false"
+RDKafka.kafka_conf_set(conf, "socket.keepalive.enable", "true")
+@test RDKafka.kafka_conf_get(conf, "socket.keepalive.enable") == "true"
+
+# Verify that uninitalised memory is not exposed to caller
+@test RDKafka.kafka_conf_get(conf, "foobar") == ""
+
+RDKafka.kafka_conf_destroy(conf)
+@test true # No exceptions
+
+@testset "Verify error callback is called" begin
+    conf = Dict()
+    conf["bootstrap.servers"] = "bad"
+    ch = Channel(1)
+    RDKafka.KafkaProducer(conf; err_cb=(err, reason) -> begin
+        push!(ch, err)
+    end)
+    @test take!(ch) == -193
+end