Script Reference


Below is the complete set of type definitions for the Kafka Magic Automation Script objects.

declare var Magic: KafkaMagic.Cluster;

declare namespace KafkaMagic {
    interface MessageContext {
        Timestamp: Date;
        Topic: string;
        Partition: number;
        Offset: number;
        SchemaId: number;
        SchemaType: string;
        Key: number[];
        Headers: any;
        Message: any;
        Error: string;
    }

    interface Cluster {
        /**
         * @returns true if script runs in validation mode.
         */
        validatingOnly: boolean;

        /**
         * Creates a cluster configuration object `KafkaClusterConfiguration` by reading data from KafkaMagic configuration store (file or memory).
         * Sensitive info removed, can't be used for connecting to cluster.
         * @param clusterName string name of the cluster registered in KafkaMagic configuration store.
         * @returns `KafkaClusterConfiguration` object retrieved from the configuration store.
         */
        getClusterConfig(clusterName: string): KafkaClusterConfiguration;

        /**
         * Helper method to make cluster configuration object `KafkaClusterConfiguration` with `bootstrapServers` property set.
         * You can set other configuration parameters and use this object in other commands in place of cluster name.
         * @param bootstrapServers A list of brokers as a CSV list of `host` or `host:port` values. @example `'localhost:9092'`
         * @returns `KafkaClusterConfiguration` object with `bootstrapServers` property set.
         */
        makeClusterConfig(bootstrapServers: string): KafkaClusterConfiguration;

        /**
         * Retrieves brokers, topics metadata for all topics in the cluster.
         * @param cluster string name of the cluster registered in KafkaMagic configuration store, or `KafkaClusterConfiguration` object defining all connection parameters for the cluster.
         * @returns `Metadata` object
         */
        getClusterMetadata(cluster: string | KafkaClusterConfiguration): Metadata;

        /**
         * Makes `Topic` object representing existing topic, to be used in searching and publishing operations.
         * @param cluster string name of the cluster registered in KafkaMagic configuration store, or `KafkaClusterConfiguration` object defining all connection parameters for the cluster.
         * @param topicName Name of the topic.
         * @returns `Topic` object
         */
        getTopic(cluster: string | KafkaClusterConfiguration, topicName: string): Topic;

        /**
         * Creates new topic in the cluster, returns `Topic` object representing created topic, to be used in searching and publishing operations.
         * @param cluster string name of the cluster registered in KafkaMagic configuration store, or `KafkaClusterConfiguration` object defining all connection parameters for the cluster.
         * @param topicName Name of the topic.
         * @param partitions Number of partitions for this topic, optional, defaults to 1
         * @param replicationFactor Number of replicas for this topic, optional, defaults to 1
         * @param avroSchema JSON string or object representation of Avro schema, defaults to `null`
         * @returns `Topic` object
         */
        createTopic(cluster: string | KafkaClusterConfiguration,
            topicName: string,
            partitions?: number,
            replicationFactor?: number,
            avroSchema?: any): Topic;

        /**
         * Retrieves schema object from Schema Registry by subjectName.
         * @param config string name of the cluster registered in KafkaMagic configuration store, 
         * or `SchemaRegistryConfiguration` object defining source schema registry connection.
         * @param subjectName Subject the schema is registered against.
         * @param version Optional version number of the schema, if not provided - gets latest version
         * @returns object representation of the schema
         */
        getSchema(config: string | SchemaRegistryConfiguration, subjectName: string, version?: number): any;

        /**
         * Retrieves schema object from Schema Registry by schemaId.
         * @param config string name of the cluster registered in KafkaMagic configuration store, 
         * or `SchemaRegistryConfiguration` object defining source schema registry connection.
         * @param schemaId Id of the schema.
         * @returns object representation of the schema
         */
        getSchemaById(config: string | SchemaRegistryConfiguration, schemaId: number): any;

        /**
         * Registers schema in the Schema Registry.
         * @param config String name of the cluster registered in KafkaMagic configuration store,
         * or `SchemaRegistryConfiguration` object defining target schema registry connection.
         * @param topicName Subject the schema should be registered against.
         * @param schema JSON string or object representation of the schema
         * @param schemaType: string, allowed values: 'Avro', 'Json'
         * @param compatibility: string, allowed values: 'NONE', 'FORWARD', 'BACKWARD', 'FULL', 'BACKWARD_TRANSITIVE', 'FORWARD_TRANSITIVE', 'FULL_TRANSITIVE'
         * @returns schemaId
        */
        registerSchema(config: string | SchemaRegistryConfiguration, subjectName: string, schema: string, schemaType: string, compatibility: string): number;

        /**
         * Deletes schema version from Schema Registry by subjectName and version. If version is not provided - deletes all versions.
         * @param config string name of the cluster registered in KafkaMagic configuration store, 
         * or `SchemaRegistryConfiguration` object defining source schema registry connection.
         * @param subjectName Subject the schema is registered against.
         * @param version Optional version number of the schema, if not provided - deletes all versions
         * @returns array of deleted schema Ids
         */
        deleteSchemaVersion(config: string | SchemaRegistryConfiguration, subjectName: string, version?: number): number[];

        /**
         * Permanently deletes all schema versions from Schema Registry by subjectName.
         * @param config string name of the cluster registered in KafkaMagic configuration store, 
         * or `SchemaRegistryConfiguration` object defining source schema registry connection.
         * @param subjectName Subject the schema is registered against.
         * @returns array of deleted schema Ids
         */
        deleteSchema(config: string | SchemaRegistryConfiguration, subjectName: string): number[];

        /**
         * Adds arbitrary data to JSON array displayed in results window
         * @param data Any object to be displayed in the results window
         */
        reportProgress(data: any): void;

        /**
         * Delay execution
         * @param time Number of milliseconds
         */
        delay(time: number): void;
    }

    /**
     * `Topic` object representing Topic configuration and related operations
     */
    interface Topic {

        /**
         * Topic name 
         */
        topicName: string;

        /**
         * Name of the cluster registered in KafkaMagic configuration store,
         * or `KafkaClusterConfiguration` object defining all connection parameters for the cluster
         */
        cluster: string | KafkaClusterConfiguration;

        /**
         * Get topic metadata
         * @returns `TopicMetadata` object
         */
        getMetadata(): TopicMetadata;

        /**
         * Publish single message without providing a key or headers. 
         * If Avro serialization is requested, the message properties must conform to the schema.
         * @param message object content of the message
         * @param useSchema boolean true if message should be serialized using registered schema selected by provided subject or by default subject name strategy
         * @param options PublishingOptions object: SchemaId, SchemaSubject, PartitionId, CompressionType, CompressionLevel
         */
        publishMessage(message: any, useSchema: boolean, options?: PublishingOptions): void;

        /**
         * Publish single message with optional key, headers, and partition Id in a Context object.
         * If Avro serialization is requested, the message must conform to the schema.
         * @param message `MessageContext` object containing message, key, headers, and partitionId
         * @param useSchema boolean true if message should be serialized using registered schema selected by provided subject or by default subject name strategy
         * @param options PublishingOptions object: SchemaId, SchemaSubject, PartitionId, CompressionType, CompressionLevel
         */
        publishMessageContext(message: MessageContext, useSchema: boolean, options?: PublishingOptions): void;

        /**
         * Publish multiple messages without providing keys or headers.
         * If Avro serialization is requested, the message must conform to the schema.
         * @param messages array of objects
         * @param useSchema boolean true if message should be serialized using registered schema selected by provided subject or by default subject name strategy
         * @param options PublishingOptions object: SchemaId, SchemaSubject, PartitionId, CompressionType, CompressionLevel
         */
        publishMessageArray(messages: any[], useSchema: boolean, options?: PublishingOptions): void;

        /**
         * Publish multiple messages, each with a key, headers, and partition Id in a Context object.
         * If Avro serialization is requested, the message must conform to the schema.
         * @param messageContextArray Array of `MessageContext` objects, each containing message, key and headers
         * @param useSchema boolean true if message should be serialized using registered schema selected by provided subject or by default subject name strategy
         * @param options PublishingOptions object: SchemaId, SchemaSubject, PartitionId, CompressionType, CompressionLevel
         */
        publishMessageContextArray(messageContextArray: MessageContext[], useSchema: boolean, options?: PublishingOptions): void;

        /**
         * Search for messages in the topic
         * @param useSchema true if try to deserialize message using registered schema.
         * @param maxResults Maximum number of messages to return.
         * @param filter Callback function accepting `MessageContext` object and returning boolean value.
         * @param options SearchOptions object defining search direction, timestamp limits, and PartitionOffset limits. 
         * For intellisense use helper objects `new KafkaMagic.SearchOptions();` and `new KafkaMagic.PartitionOffset(partitionId, minOffset, maxOffset);`
         * @returns Array of `MessageContext` objects.
         */
        search(useSchema: boolean, maxResults: number, filter?: SearchFilterFunction, options?: SearchOptions): MessageContext[];

        /**
         * Process messages in the topic using `processor` callback function.
         * @param useSchema true if try to deserialize message using registered schema.
         * @param maxResults Maximum number of messages to process.
         * @param processor Callback function accepting `MessageContext` object. Any return value will be ignored
         * @param options SearchOptions object defining search direction, timestamp limits, and PartitionOffset limits.
         * For intellisense use helper objects `new KafkaMagic.SearchOptions();` and `new KafkaMagic.PartitionOffset(partitionId, minOffset, maxOffset);`
         * @returns nothing.
         */
        process(useSchema: boolean, maxResults: number, processor?: ProcessorFunction, minTimestamp?: Date, maxTimestamp?: Date,
            partitionOffsets?: PartitionOffset[]): void;

        /**
         * Get topic statistics: min/max offsets and timestamps for each partition
         * @returns Array of `PartitionStats` objects
         */
        getStats(): PartitionStats[];

        /**
         * Delete topic and data
         */
        delete(): void;
    }

    type SearchFilterFunction = (Context: MessageContext) => boolean;
    type ProcessorFunction = (Context: MessageContext) => void;

    class SearchOptions {
        /**
         * Search direction, by default accending - in order of messages published
         */
        descending: boolean;

        /**
         * Date, string, or number - Minimum message timestamp to limit search scope to limit search scope by time
         */
        minTimestamp: Date;

        /**
         * Date, string, or number - Maximum message timestamp to limit search scope to limit search scope by time
         */
        maxTimestamp: Date;

        /**
         * Array of `PartitionOffset` objects to limit search scope by partitions and offsets
         */
        partitionOffsets: PartitionOffset[];
    }

    class PublishingOptions {
        /**
         * Id of the registered schema used to serialize message
         */
        SchemaId: number;

        /**
         * Subject of the registered schema used to serialize message
         */
        SchemaSubject: string;

        /**
         * SubjectNameStrategy for the producer. Available values: `None`, `Topic`, `Record`, `TopicRecord`. Any other value considered `None`.
         */
        SubjectNameStrategy: string;

        /**
         * Id of the partition to which the message should be published
         */
        PartitionId: number;

        /**
         * Compression type. Allowed values: 'None', 'Gzip', 'Snappy', 'Lz4', 'Zstd'
         */
        CompressionType: string;

        /**
         * Compression level. Allowed values depend on the Compression type
         */
        CompressionLevel: number;
    }

    class PartitionOffset {
        PartitionId: number;
        MinOffset: number;
        MaxOffset: number;
    }

    interface Metadata {
        Brokers: BrokerMetadata[];
        Topics: TopicMetadata[];
        OriginatingBrokerId: number;
        OriginatingBrokerName: string;
    }

    interface BrokerMetadata {
        BrokerId: number;
        Host: string;
        Port: number;
    }

    interface TopicMetadata {
        Topic: string;
        Partitions: PartitionMetadata[];
        Error: MetadataError;
    }

    interface PartitionStats {
        PartitionId: number;
        MinOffset: number;
        MinTimestamp: Date;
        MaxOffset: number;
        MaxTimestamp: Date;
    }

    interface PartitionMetadata {
        PartitionId: number;
        Leader: number;
        Replicas: number[];
        InSyncReplicas: number[];
        Error: MetadataError;
    }

    interface MetadataError {
        ErrorCode: number;
        IsFatal: boolean;
        Reason: string;
    }

    class SchemaRegistryConfiguration {
        SchemaRegistryBasicAuthCredentialsSource: string;
        SchemaRegistryUrl: string;
        SchemaRegistryRequestTimeoutMs: number;
        SchemaRegistryMaxCachedSchemas: number;
        SchemaRegistryBasicAuthUserInfo: string;
        AutoRegisterSchemas: boolean;
    }

    class KafkaClusterConfiguration {
        ClusterId: string;
        ClusterName: string;
        SchemaRegistry: SchemaRegistryConfiguration;
        LogConnectionClose: boolean;
        InternalTerminationSignal: number;
        ApiVersionRequest: boolean;
        ApiVersionRequestTimeoutMs: number;
        ApiVersionFallbackMs: number;
        BrokerVersionFallback: string;
        SecurityProtocol: string;
        SslCipherSuites: string;
        SslCurvesList: string;
        SslSigalgsList: string;
        SslKeyLocation: string;
        SslKeyPassword: string;
        SslCertificateLocation: string;
        SslCaLocation: string;
        SslCrlLocation: string;
        SslKeystoreLocation: string;
        SslKeystorePassword: string;
        SaslKerberosServiceName: string;
        SaslKerberosPrincipal: string;
        SaslKerberosKinitCmd: string;
        SaslKerberosKeytab: string;
        SaslKerberosMinTimeBeforeRelogin: number;
        SaslUsername: string;
        LogThreadName: boolean;
        LogQueue: boolean;
        StatisticsIntervalMs: number;
        ReconnectBackoffMaxMs: number;
        SaslMechanism: string;
        Acks: string;
        ClientId: string;
        BootstrapServers: string;
        MessageMaxBytes: number;
        MessageCopyMaxBytes: number;
        ReceiveMessageMaxBytes: number;
        MaxInFlight: number;
        MetadataRequestTimeoutMs: number;
        TopicMetadataRefreshIntervalMs: number;
        MetadataMaxAgeMs: number;
        SaslPassword: string;
        TopicMetadataRefreshFastIntervalMs: number;
        TopicBlacklist: string;
        Debug: string;
        SocketTimeoutMs: number;
        SocketSendBufferBytes: number;
        SocketReceiveBufferBytes: number;
        SocketKeepaliveEnable: boolean;
        SocketNagleDisable: boolean;
        SocketMaxFails: number;
        BrokerAddressTtl: number;
        BrokerAddressFamily: string;
        ReconnectBackoffMs: number;
        TopicMetadataRefreshSparse: boolean;
        PluginLibraryPaths: string;
        FetchErrorBackoffMs: number;
        FetchMinBytes: number;
        FetchMaxBytes: number;
        MaxPartitionFetchBytes: number;
        FetchWaitMaxMs: number;
        QueuedMaxMessagesKbytes: number;
        QueuedMinMessages: number;
        EnableAutoOffsetStore: boolean;
        AutoCommitIntervalMs: number;
        MaxPollIntervalMs: number;
        EnablePartitionEof: boolean;
        CoordinatorQueryIntervalMs: number;
        GroupProtocolType: string;
        HeartbeatIntervalMs: number;
        SessionTimeoutMs: number;
        PartitionAssignmentStrategy: string;
        GroupId: string;
        AutoOffsetReset: string;
        ConsumeResultFields: string;
        EnableAutoCommit: boolean;
        CheckCrcs: boolean;
        ConsumeBufferLength: number;
    }
}