Publish messages to Kafka topics

Provided with Professional license.

You can publish (produce) JSON or Avro serialized messages to a Kafka topic using User Interface or Automation Script. In both cases you have options to provide a message content or putting a message in the Context, containing content, headers and a key. You can publish a single message or an array of messages in a single step. You can also set message compression type and level.

Publishing using UI

Kafka Magic: Publish message through UI

Publishing a single message

This is a simplest way of publishing a message. Select your cluster, topic, and provide a JSON describing the message fields. Make sure you set the appropriate state for the ‘Use Avro serialization’ checkbox, uncheck the ‘Array as multiple messages’ and ‘Recognize message Context’ checkboxes.

Message example:

{
  "Field1": 123.45,
  "Field2": "test message"
}

Publishing a message with headers and a key

When you want to supplement the message with a key or headers, you need to provide the message in the Context format, which is similar to what you see when reading messages from a topic. Headers field contains an object which fields represent headers, Key field contains a binary array representing the Key. Message field contains the message content. You can also provide a Partition Id if you want to publish to a specific topic partition. All Context fields are optional.

Don’t forget to set the ‘Recognize message Context’ checkbox.

Message Context example:

{
  "Headers": {
    "MyHeader1": "string1",
    "MyHeader2": "string2"
  },
  "Key": [77, 78, 79],
  "Message": {
    "VendorID": 2,
    "pickup_datetime": "2016-01-01 00:00:00",
    "dropoff_datetime": "2016-01-01 00:00:00",
    "trip_distance": 1.1,
    "total_amount": 8.8
  },
  "Partition": null
}

Publishing multiple messages in a single step

You can publish multiple messages (with or without Context) by providing them in a JSON array.

Don’t forget to set the ‘Array as multiple messages’ checkbox.

Message array example:

[
 {
  "Field1": 123.45,
  "Field2": "test message 1"
 },
 {
  "Field1": 234.56,
  "Field2": "test message 2"
 }
]

Publishing using Automation Script

You can perform a complex set of publishing operations using Kafka Magic Automation Script.

Publishing example:

// create new topic
var topic = Magic.createTopic('My cluster name', 'my_new_topic_name');

// create message object
var message = { "myField1": "bar", "myField2": 3 };

// publish
topic.publishMessage(message);

There are several functions you can use to publish messages to topics. Here are the definitions:

/**
 * Publish single message without providing a key or headers. 
 * If the topic was registered with Avro schema the message properties must conform to the schema.
 * @param message object
 * @param isAvro boolean true if message should be Avro serialized
 * @param options PublishingOptions object: PartitionId, CompressionType, CompressionLevel
 */
publishMessage(message: any, isAvro: boolean, options?: PublishingOptions): void;

/**
 * Publish single message with optional key, headers, and/or partitionId in a Context object.
 * If the topic was registered with Avro schema the message must conform to the schema.
 * @param message `MessageContext` object containing message, key, headers, and/or partitionId
 * @param isAvro boolean true if message should be Avro serialized
 * @param options PublishingOptions object: PartitionId, CompressionType, CompressionLevel
 */
publishMessageContext(message: MessageContext, isAvro: boolean, options?: PublishingOptions): void;

/**
 * Publish multiple messages without providing keys or headers.
 * If the topic was registered with Avro schema messages must conform to the schema.
 * @param messages array of objects
 * @param isAvro boolean true if message should be Avro serialized
 * @param options PublishingOptions object: PartitionId, CompressionType, CompressionLevel
 */
publishMessageArray(messages: any[], isAvro: boolean, options?: PublishingOptions): void;

/**
 * Publish multiple messages, each with a key and headers in a Context object.
 * If the topic was registered with Avro schema every message must conform to the schema.
 * @param messageContextArray Array of `MessageContext` objects, each containing message, key and headers
 * @param isAvro boolean true if message should be Avro serialized
 * @param options PublishingOptions object: PartitionId, CompressionType, CompressionLevel
 */
publishMessageContextArray(messageContextArray: MessageContext[], isAvro: boolean, options?: PublishingOptions): void;


interface PublishingOptions {
    PartitionId: number;
    CompressionType: string;
    CompressionLevel: number;
}

See complete script description here.