zzpublish


Do Multiple Tasks with a Single Script

Provided with Professional license.

Write JavaScript code (ECMAScript 5.1) to manipulate topics and messages across Kafka clusters. See Script Reference for exact definitions of all objects.

Kafka Magic: Automation Script

Running Script in Validation Mode

After entering your script into the editor window it is recommended to run it in validation mode - click ‘Validate Script’ button.

In validation mode no changes are made in the cluster, only metadata read-only operations are executed, for all other operations only parameter validations are performed.

Script Timeout

The script execution will be stopped after the timeout expires. Default timeout period is 1800 seconds (30 minutes). You can configure different timeout value for the script execution.

For Docker container use Environment variable KMAGIC_SCRIPT_TIMEOUT_SEC

For desktop app use configuration parameter SCRIPT_TIMEOUT_SEC in the appsettings.json file.

Cluster Configuration

To perform an operation on a Kafka cluster you need to configure a connection to the cluster. Many Automation Script functions accept cluster connection configuration in one of two forms:

  • string - the Cluster Name under which you have registered the cluster in the Kafka Magic app, you can see the name in the cluster explorer tree.
  • KafkaClusterConfiguration object - configuration object describing all parameters needed to connect to the cluster.

Cluster Name example:

var metadata = Magic.getClusterMetadata('My cluster');

Explicit Configuration Object example:

var config = {
  "BootstrapServers": "localhost:9092",
  "SchemaRegistry": {
      "SchemaRegistryUrl": "http://localhost:8081"
  }
};
var metadata = Magic.getClusterMetadata(config);

‘return’ statement

Any object returned from the script will be displayed in the Results window in JSON format.

Magic - root object

Use global Magic object to perform cluster-wide operations and to get a topic reference.

Magic.validatingOnly boolean property

Boolean property returning true if the script is running in validation mode.

You can inspect this property, for example, when you need to bypass a part of the script which would otherwise throw an exception in read-only validation.

Example:
if (Magic.validatingOnly) {
    
    Magic.reportProgress('Validating.');
}
else {

    Magic.reportProgress('Executing for real.')
}

return 'Done!';

Magic.makeClusterConfig(bootstrapServers) function

Helper function making cluster configuration object KafkaClusterConfiguration, optionally with bootstrapServers property set. This convenience function actually doesn’t do anything, it just returns an object which will work with IntelliSense autocomplete suggestions in the editor. You can set other configuration properties and use this object in other commands where cluster configuration is required.

Use this function if you want to manage cluster which is not registered in the Kafka Magic app’s configuration store.

Example:
// make and populate configuration object:
var config = Magic.makeClusterConfig('localhost:9092');

config.SchemaRegistry.SchemaRegistryUrl = 'http://localhost:8081';

// use it in other commands:
var meta = Magic.getClusterMetadata(config);

return meta;

Magic.getClusterConfig(clusterName) function

Returns a cluster configuration object KafkaClusterConfiguration created by reading data from Kafka Magic app’s configuration store (file or memory). Because sensitive info (secrets) is removed, the resulting object can’t be used “as-is” for connecting to the cluster.

Parameters:
  • clusterName: string - name of the cluster registered in the Kafka Magic app’s configuration store.
Example:
var config = Magic.getClusterConfig('My cluster');
return config;

Magic.getClusterMetadata(cluster) function

Retrieves brokers, topics metadata for all topics in the cluster.

Parameters:
  • cluster: string or KafkaClusterConfiguration object - cluster name or configuration object.
Example:
var metadata = Magic.getClusterMetadata('My cluster');
return metadata;

Magic.getTopic(cluster, topicName) function

Gets Topic reference object representing existing topic, to be used in topic-level operations.

Parameters:
  • cluster: string or KafkaClusterConfiguration object - cluster name or configuration object.
  • topicName: string - name of the topic.
Example:
var topic = Magic.getTopic('My cluster', 'my-topic');

Magic.createTopic(…) function

Creates new topic in the cluster, optionally registers Avro schema for the topic, returns Topic reference object representing created topic, to be used in topic-level operations. If Avro schema registration is requested, the cluster configuration must contain Schema Registry connection parameters. Can take several seconds to complete.

Parameters:
  • cluster: string or KafkaClusterConfiguration object - cluster name or configuration object.
  • topicName: string - name of the new topic.
  • partitions: number - number of partitions for this topic, optional, defaults to 1.
  • replicationFactor: number - number of replicas for this topic, optional, defaults to 1.
  • avroSchema: string or object - JSON string or object representation of Avro schema, defaults to null.
Example:
var topic = Magic.createTopic('My cluster', 'new-topic', 4);
return 'Topic created!';

Magic.createTopicClone(…) function

Creates new topic in the cluster copying number of partitions and Avro schema from a source topic, optionally from a different cluster. Returns Topic reference object representing created topic.

Parameters:
  • cluster: string or KafkaClusterConfiguration object - cluster name or configuration object.
  • newTopicName: string - name of the new topic.
  • sourceTopic: Topic reference object - containing topicName and cluster configuration defining source topic to be cloned.
  • partitions: number - number of partitions for this topic, optional, if provided overrides number of partitions in the source topic.
  • replicationFactor: number - number of replicas for this topic, optional, defaults to 1.
Example:
var sourceTopic = Magic.getTopic('My cluster', 'old-topic');

var topic = Magic.createTopicClone('My cluster', 'new-topic', sourceTopic);

return 'Topic created: ' + topic.topicName;

Magic.getAvroSchema(…) function

Retrieves Avro schema object from an Avro schema registry by subject (topic name).

Parameters:
  • config: string or SchemaRegistryConfiguration object - name of the cluster registered in Kafka Magic app’s configuration store, or SchemaRegistryConfiguration object defining source Avro schema registry connection.
  • topicName: string - Subject (topic name) for which the schema was registered.
  • version: number - optional version number of the schema, if not provided - gets latest version
Example 1:
var schema = Magic.getAvroSchema('My cluster', 'my-avro-topic');
return schema;
Example 2:
var registryConfig = {
    "SchemaRegistryUrl": "http://localhost:8081"
};
var schema = Magic.getAvroSchema(registryConfig, 'my-avro-topic');
return schema;

Magic.registerAvroSchema(…) function

Registers Avro schema in the Avro schema registry.

Parameters:
  • config: string or SchemaRegistryConfiguration object - name of the cluster registered in Kafka Magic app’s configuration store, or SchemaRegistryConfiguration object defining Avro schema registry connection.
  • topicName: string - Subject (topic name) for which the schema should be registered.
  • schema: string or object - JSON string or object representation of the Avro schema.
Example:
var schema = Magic.getAvroSchema('My cluster', 'some-avro-topic'); // reading existing schema

var newField = {
    name: 'MyNewField',
    type: 'int',
    doc: 'Some random number'
};
schema.fields.push(newField); // modifying schema by adding new field

var newSubject = 'new-topic'; // target topic

Magic.registerAvroSchema('My cluster', newSubject, schema); // registering schema for target topic

Magic.reportProgress('Registered new schema, trying to read it.');

var newSchema = Magic.getAvroSchema('My cluster', newSubject);

return newSchema; // displaying new schema in Results window

Magic.reportProgress(data) function

Use this function to display arbitrary data in the Results window during script execution.

Parameters:
  • data: any - arbitrary data
Example:
Magic.reportProgress('Display this string in the Results window');

Magic.delay(time) function

Use this function to delay script execution for the specified number of milliseconds, for example, when you have to wait for a change to take effect.

Parameters:
  • time: number - delay time in milliseconds
Example:
Magic.reportProgress(new Date());

Magic.delay(5000);

return new Date();

Topic reference object

Topic reference object represents existing topic, is used in topic-level operations. You get this object as a result of some methods of the Magic global object, like: getTopic(), createTopic(), createTopicClone().

Example:
var topic = Magic.getTopic('My cluster', 'new-topic');

try {

    topic.getMetadata(); // throws exception if topic doesn't exist
    
    Magic.reportProgress("Topic exists, delete it.");

    topic.delete();

    return "Topic 'new-topic' deleted.";
}
catch (e) {

    Magic.reportProgress("Topic not found, create it.");

    topic = Magic.createTopic('My cluster', 'new-topic');

    return "Topic 'new-topic' created.";
}

topic.getMetadata() function

Returns topic metadata object, throws exception if topic doesn’t exist.

Example:
var topic = Magic.getTopic('My cluster', 'some-topic');
var meta = topic.getMetadata();
return meta;

Publishing to a topic

You can publish (produce) a JSON or Avro serialized messages to a Kafka topic using topic reference object. You have options to provide a message content only, or putting a message in the Context, containing content, headers, a key, and optionally a partition id. You can publish a single message or an array of messages in a single step, can also set message compression type and level.

topic.publishMessage(…) function

Publish a single message without providing a key or headers. If Avro serialization is requested, the message properties must conform to the schema.

Parameters:
  • message: object - message content.
  • isAvro: boolean - true if message should be Avro serialized. Optional, defaults to false.
  • options: PublishingOptions object - contains PartitionId, CompressionType, CompressionLevel properties. Optional.
Example:
var topic = Magic.getTopic('My cluster', 'my-topic');

var message = {
  "Field1": 123.45,
  "Field2": "test message"
};

// publish
topic.publishMessage(message);

return message;

topic.publishMessageContext(…) function

Publish single message with optional key, headers, and partition Id in a Context object. Partition provided in a message context overrides the PartitionId provided in the options parameter. If Avro serialization is requested, the message must conform to the schema.

Parameters:
  • message: MessageContext object - object containing message content, key, headers, and partitionId
  • isAvro: boolean - true if message should be Avro serialized, optional, defaults to false.
  • options: PublishingOptions object - contains PartitionId, CompressionType, CompressionLevel properties, optional.
Example:
var topic = Magic.getTopic('My cluster', 'my-topic');

var messageContext = {
  "Message": {
    "Field1": 123.45,
    "Field2": "test message"
  },
  "Headers": {
    "MyHeader1": "string1",
    "MyHeader2": "string2"
  },
  "Key": [77, 78, 79],
  "Partition": null
};

// publish
topic.publishMessageContext(messageContext);

return messageContext;

topic.publishMessageArray(…) function

Publish multiple messages without providing a key or headers. If Avro serialization is requested, the message properties must conform to the schema.

Parameters:
  • messages: object array - array of message content objects.
  • isAvro: boolean - true if message should be Avro serialized, optional, defaults to false.
  • options: PublishingOptions object - contains PartitionId, CompressionType, CompressionLevel properties, optional.
Example:
var topic = Magic.getTopic('My cluster', 'my-topic');

var messages = [
 {
  "Field1": 123.45,
  "Field2": "test message 1"
 },
 {
  "Field1": 234.56,
  "Field2": "test message 2"
 }
];

topic.publishMessageArray(messages);

return messages;

topic.publishMessageContextArray(…) function

Publish multiple messages, each with a key, headers, and partition id in a Context object. Partition provided in a message context overrides the PartitionId provided in the options parameter. If Avro serialization is requested, the message must conform to the schema.

Parameters:
  • messageContextArray: Array of MessageContext objects - each object containing message content, key, headers, and partitionId
  • isAvro: boolean - true if message should be Avro serialized, optional, defaults to false.
  • options: PublishingOptions object - contains PartitionId, CompressionType, CompressionLevel properties, optional.
Example:
var topic = Magic.getTopic('My cluster', 'my-topic');

var messageContextArr = [
    {
    "Message": {
        "Field1": 123.45,
        "Field2": "test message 1"
    },
    "Headers": {
        "MyHeader1": "string1-1",
        "MyHeader2": "string2-1"
    },
    "Key": [77, 78, 79],
    "Partition": null
    },
    {
    "Message": {
        "Field1": 234.56,
        "Field2": "test message 2"
    },
    "Headers": {
        "MyHeader1": "string1-2",
        "MyHeader2": "string2-2"
    },
    "Key": [77, 78, 79],
    "Partition": null
    }
];

// publish
topic.publishMessageContextArray(messageContextArr);

return messageContextArr;

Searching for messages in a topic

Whether it is Json or Avro serialized message you can search for it using JavaScript function referencing any combination of the message fields, headers, and metadata.

topic.search(…) function

Searches for messages in the topic. Returns array of MessageContext objects.

Parameters:
  • isAvro: boolean - true if topic message is Avro serialized.
  • maxResults: number - Maximum number of messages to return.
  • filter: function - Callback function accepting MessageContext object and returning boolean value.
  • minTimestamp: Date, string, or number - Minimum message timestamp to limit search scope to limit search scope by time. Optional.
  • maxTimestamp: Date, string, or number - Maximum message timestamp to limit search scope to limit search scope by time. Optional.
  • partitionOffsets: Array of PartitionOffset objects - to limit search scope by partitions and offsets. Optional.

PartitionOffset type:

interface PartitionOffset {
        PartitionId: number;
        MinOffset: number;
        MaxOffset: number;
    }
Examples:

Search using JavaScript query function:

var topic = Magic.getTopic('My cluster', 'my-topic');

var searchFilter = function (context) {
    return context.Headers.MyHeader1 == 'string1' && context.Message.Field1 > 100;
}

var foundMessages = topic.search(false, 10, searchFilter);

return foundMessages;

Search using timestamp range parameters: minTimestamp and maxTimestamp:

var topic = Magic.getTopic('My cluster', 'my-topic');

var foundMessages = topic.search(
    false, 
    10, 
    function (context) { return true; }, 
    '2020-02-20T13:14:32', 
    '2020-02-20T14:00:00');

return foundMessages;

Search in a single partition while limiting search scope by minimum offset:

var topic = Magic.getTopic('My cluster', 'my-topic');

var messages = topic.search(false, 10, null, null, null, [{ PartitionId: 2, MinOffset: 3 }]);

return foundMessages;

Other topic functions

topic.getStats() function

Gets topic statistics: min/max offsets and timestamps for each partition. Returns Array of PartitionStats objects.

Example:
var topic = Magic.getTopic('My cluster', 'my-topic');

var stats = topic.getStats();

return stats;

topic.delete() function

Deletes the topic and all data in it. Can take several seconds to complete.

Example:
var topic = Magic.getTopic('My cluster', 'my-topic');

// delete topic
topic.delete();