1. Command Summary
There are six functional categories of KCE commands.
Category 1. Scripts for managing Kafka topics:
- list_topics.sh: list all topics.
- desc_topic.sh: describe details of a topic.
- create_topic.sh: create a new topic.
- delete_topic.sh: delete a topic.
Category 2. Scripts for managing Access Control List (ACL):
- list_acls.sh: list all ACL rules.
- create_acl_for_producer.sh: create ACL rules to allow a user to produce data to a topic.
- delete_acl_for_producer.sh: revoke ACL rules allowing a user to produce data to a topic.
- create_acl_for_consumer.sh: create ACL rules to allow a user to consume data from a topic.
- delete_acl_for_consumer.sh: revoke ACL rules allowing a user to consume data from a topic.
- create_acl_for_topic.sh: create ACL rules to allow a user to create/delete/describe topics.
- delete_acl_for_topic.sh: revoke ACL rules to allow a user to create/delete/describe topics.
- create_acl_for_groups.sh: create ACL rules to allow a user to create/delete/describe consumer groups. (To consume a topic, a user must get a consumer group from Kafka.)
- delete_acl_for_groups.sh: revoke ACL rules to allow a user to create/delete/describe topics.
- create_acl_for_txid.sh: create ACL rules to allow transactional-id behavior for a user. (With the transactional-id permission, the user can write to a topic transactionally.)
- delete_acl_for_txid.sh: revoke ACL rules allowing transactional-id behavior for a user.
Category 3. Scripts for data:
- produce.sh: produce data to a topic.
- consume.sh: consume data from a topic.
Category 4. Scripts for managing reader offsets:
- list_reader_offsets.sh: list reader offsets for all readers. (The offsets are stored in Kafka-Connect metadata. This command lists all the metadata records.)
- set_reader_offset.sh: modify the Kafka-Connect metadata record to change reader offset.
Category 5. Scripts for managing consumer groups:
- list_consumer_groups.sh: list all consumer groups.
- desc_consumer_group.sh: describe details of a consumer group.
- set_consumer_offset.sh: sets the offset of a consumer group, (so that the consumer can rewind to consume previously consumed data again).
- delete_consumer_group.sh: delete the consumer group.
Category 6. Scripts for managing Kafka connectors:
- list_plugins.sh: list installed plugins in Kafka-Connect.
- list_connectors.sh: list active connectors.
- connector_status.sh: show the status of a connector.
- connector_config.sh: show the configuration of a connector.
- connector_pause.sh: pause a connector.
- connector_resume.sh: resume a paused connector.
- connector_restart.sh: restart a connector.
- create_connector.sh: start a connector.
- set_connector_config.sh: change the configuration of a connector.
- delete_connector.sh: delete a connector.
2. The User Names
All the KCE commands need a username. There are three types of users: admin user, producer user, and consumer user. Your KCE have created properties files for them.
If you forget the names of these users, check the files in the properties directory in the KCE. In the internal KCE, for example, this directory is “/app/properties.”
root@80cd49139150:/# cd /app/properties root@80cd49139150:/app/properties# ls admin_jdoe.properties consumer_april.properties producer_jeff.properties root@80cd49139150:/app/properties#
In this example, you can see “jdoe” is an admin user, “jeff” is a producer user, and “april” is a consumer user.
In a local KCE, the properties directory is in the directory where you installed KCE.
3. Category 1. Managing Topics
- list_topics.sh
The “list_topics.sh” script allows you to list all topics in the Kafka system, like the following example.
% ./list_topics.sh jdoe __consumer_offsets connect-configs connect-offsets connect-status t1
This command requires one argument which is the admin username.
- desc_topic.sh
The script “desc_topic.sh” displays details of a topic, like the following example.
% ./desc_topic.sh jdoe t1 Topic: t1 PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: t1 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: t1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
This script requires two arguments, the admin username and the topic. Run it without argument, you will see the usage hint.
% ./desc_topic.sh Usage: ./desc_topic.sh ADMIN_USER TOPIC
The most important stats in the output are the “Isr” numbers. They show the replicas that are in sync. If a topic constantly has out-of-sync replicas, your Kafka system may have unstable brokers. You must immediately investigate and make remedies.
You can also use this script to see the number of partitions in the topic.
- create_topic.sh
The “create_topic.sh” script helps you create a topic. The following is its format.
Usage: ./create_topic.sh ADMIN_USER TOPIC [ PARTITIONS [ RF [ COMPACT-Y-N ] ] ]
where PARTITIONS is the number of partitions in the topic, RF is the replication factor (i.e., the number of replicas of the topic), and COMPACT-Y-N is a flag indicating if the topic should be compacted (“Y”) or not (“N”).
The ADMIN_USER and TOPIC are required arguments. Here is an example.
% ./create_topic.sh jdoe t2 Created topic t2.
If you do not see “Created topic …” message, the creation failed. You may comment out the text “2>/dev/null” in the script and run it again. This time, you should see some diagnostic messages.
- delete_topic.sh
The script “delete_topic.sh” allows you to delete a topic. Its usage is as follows.
./delete_topic.sh Usage: ./delete_topic.sh ADMIN_USER TOPIC
There will be no message returned if this script is successful. Run list_topics.sh to see if the topic is actually deleted.
4. Category 2: Managing ACLs
- list_acls.sh
The script “list_acls.sh” is for displaying all current Access Control List (ACL) entries. It’s format is as follows.
% ./list_acls.sh Usage: ./list_acls.sh ADMIN_USER [TOPIC]
The TOPIC argument is optional. If present, the script will only display entries related to the topic.
Here is an example of the interaction.
% ./list_acls.sh jdoe t1 Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=t1, patternType=LITERAL)`: (principal=User:april, host=*, operation=CREATE, permissionType=ALLOW) (principal=User:jeff, host=*, operation=CREATE, permissionType=ALLOW) (principal=User:april, host=*, operation=DESCRIBE, permissionType=ALLOW) (principal=User:jeff, host=*, operation=DESCRIBE, permissionType=ALLOW) (principal=User:jeff, host=*, operation=WRITE, permissionType=ALLOW) (principal=User:april, host=*, operation=WRITE, permissionType=ALLOW) (principal=User:april, host=*, operation=READ, permissionType=ALLOW)
- create_acl_for_producer.sh
The script “create_acl_for_producer.sh” is for granting the producer privilege to a user. Its format is as follows.
% ./create_acl_for_producer.sh Usage ./create_acl_for_producer.sh ADMIN_USER USER TOPIC Meaning: ADMIN_USER grants USER the producer privilege on TOPIC
And here is an example interaction.
% ./create_acl_for_producer.sh jdoe jeff t2 Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=t2, patternType=LITERAL)`: (principal=User:jeff, host=*, operation=WRITE, permissionType=ALLOW) (principal=User:jeff, host=*, operation=DESCRIBE, permissionType=ALLOW) (principal=User:jeff, host=*, operation=CREATE, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=t2, patternType=LITERAL)`: (principal=User:jeff, host=*, operation=CREATE, permissionType=ALLOW) (principal=User:jeff, host=*, operation=WRITE, permissionType=ALLOW) (principal=User:jeff, host=*, operation=DESCRIBE, permissionType=ALLOW) Adding ACLs for resource `ResourcePattern(resourceType=TRANSACTIONAL_ID, name=*, patternType=LITERAL)`: (principal=User:jeff, host=*, operation=ALL, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=TRANSACTIONAL_ID, name=*, patternType=LITERAL)`: (principal=User:jeff, host=*, operation=ALL, permissionType=ALLOW) (principal=User:april, host=*, operation=ALL, permissionType=ALLOW)
- delete_acl_for_producer.sh
The script “delete_acl_for_producer.sh” is for revoking the producer privilege from a user. Its format is:
% ./delete_acl_for_producer.sh Usage ./delete_acl_for_producer.sh ADMIN_USER USER TOPIC
And here is an example interaction.
% ./delete_acl_for_producer.sh jdoe jeff t2 Are you sure you want to remove ACLs: (principal=User:jeff, host=*, operation=WRITE, permissionType=ALLOW) (principal=User:jeff, host=*, operation=CREATE, permissionType=ALLOW) (principal=User:jeff, host=*, operation=DESCRIBE, permissionType=ALLOW) from resource filter `ResourcePattern(resourceType=TOPIC, name=t2, patternType=LITERAL)`? (y/n) y Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=t2, patternType=LITERAL)`: (principal=User:april, host=*, operation=DESCRIBE, permissionType=ALLOW) (principal=User:april, host=*, operation=READ, permissionType=ALLOW) Are you sure you want to remove ACLs: (principal=User:jeff, host=*, operation=ALL, permissionType=ALLOW) from resource filter `ResourcePattern(resourceType=TRANSACTIONAL_ID, name=*, patternType=LITERAL)`? (y/n) y Current ACLs for resource `ResourcePattern(resourceType=TRANSACTIONAL_ID, name=*, patternType=LITERAL)`: (principal=User:april, host=*, operation=ALL, permissionType=ALLOW)
After this, sending a record by user “jeff” will hit this error:
ERROR [Producer clientId=console-producer] Topic authorization failed for topics [t2] (org.apache.kafka.clients.Metadata)
The above error will be visible if you make the script verbose by commenting out the text “2>/dev/null”.
- create_acl_for_consumer.sh
The script “create_acl_for_consumer.sh” is for granting the consumer privilege to a user. Its format is as follows.
% ./create_acl_for_consumer.sh Usage ./create_acl_for_consumer.sh ADMIN_USER USER TOPIC Meaning: ADMIN_USER grants USER the consumer privilege on TOPIC
And here is an example interaction.
% ./create_acl_for_consumer.sh jdoe april t2 Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=t2, patternType=LITERAL)`: (principal=User:april, host=*, operation=READ, permissionType=ALLOW) (principal=User:april, host=*, operation=DESCRIBE, permissionType=ALLOW) Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=*, patternType=LITERAL)`: (principal=User:april, host=*, operation=READ, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=*, patternType=LITERAL)`: (principal=User:bk, host=*, operation=ALL, permissionType=ALLOW) (principal=User:kc, host=*, operation=ALL, permissionType=ALLOW) (principal=User:kc_consumer, host=*, operation=ALL, permissionType=ALLOW) (principal=User:april, host=*, operation=READ, permissionType=ALLOW) (principal=User:april, host=*, operation=ALL, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=t2, patternType=LITERAL)`: (principal=User:jeff, host=*, operation=CREATE, permissionType=ALLOW) (principal=User:april, host=*, operation=DESCRIBE, permissionType=ALLOW) (principal=User:jeff, host=*, operation=DESCRIBE, permissionType=ALLOW) (principal=User:jeff, host=*, operation=WRITE, permissionType=ALLOW) (principal=User:april, host=*, operation=READ, permissionType=ALLOW)
After this, user “april” can read from topic “t2.”
- delete_acl_for_consumer.sh
The script “delete_acl_for_consumer.sh” is for revoking the consumer privilege from a user. Its format is:
% ./delete_acl_for_consumer.sh Usage ./delete_acl_for_consumer.sh ADMIN_USER USER TOPIC
And here is an example interaction.
% ./delete_acl_for_consumer.sh jdoe april t2 Are you sure you want to remove ACLs: (principal=User:april, host=*, operation=READ, permissionType=ALLOW) (principal=User:april, host=*, operation=DESCRIBE, permissionType=ALLOW) from resource filter `ResourcePattern(resourceType=TOPIC, name=t2, patternType=LITERAL)`? (y/n) y Are you sure you want to remove ACLs: (principal=User:april, host=*, operation=READ, permissionType=ALLOW) from resource filter `ResourcePattern(resourceType=GROUP, name=*, patternType=LITERAL)`? (y/n) y Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=*, patternType=LITERAL)`: (principal=User:kc, host=*, operation=ALL, permissionType=ALLOW) (principal=User:bk, host=*, operation=ALL, permissionType=ALLOW) (principal=User:april, host=*, operation=ALL, permissionType=ALLOW) (principal=User:kc_consumer, host=*, operation=ALL, permissionType=ALLOW)
The script will double check with you on removals of ACL entries. After this, reading a record by user “april” will hit this error:
ERROR [Consumer clientId=consumer-g1-1, groupId=g1] Topic authorization failed for topics [t2] (org.apache.kafka.clients.Metadata)
The above error will be visible if you make the script verbose by commenting out the text “2>/dev/null”.
- create_acl_for_topic.sh
The script “create_acl_for_topic.sh” is for granting fine-grained permissions on a topic. Its format is as follows.
% ./create_acl_for_topic.sh Usage ./create_acl_for_topic.sh ADMIN_USER USER TOPIC OPERATION Meaning: ADMIN_USER grants USER permision to perform OPERATION on TOPIC. Valid operations are: Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs, DescribeConfigs, IdempotentWrite, All
- delete_acl_for_topic.sh
The script “delete_acl_for_topic.sh” is for revolking fine-grained permissions on a topic from a user. Its format is as follows.
% ./delete_acl_for_topic.sh Usage ./delete_acl_for_topic.sh ADMIN_USER USER TOPIC OPERATION Valid operations are: Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs, DescribeConfigs, IdempotentWrite, All
- create_acl_for_groups.sh
The script “create_acl_for_groups.sh” is for allowing a user to create consumer groups. Its format is as follows.
% ./create_acl_for_groups.sh Usage ./create_acl_for_groups.sh ADMIN_USER USER Meaning: ADMIN_USER grants permission to USER to create groups.
- delete_acl_for_groups.sh
The script “delete_acl_for_groups.sh” is for revoking create-group permission from a user.
% ./delete_acl_for_groups.sh Usage ./delete_acl_for_groups.sh ADMIN_USER USER
- create_acl_for_txid.sh
The script “create_acl_for_txid.sh” is for granting permission to a user to perform transactional write to a topic. Its format is as follows.
% ./create_acl_for_txid.sh Usage ./create_acl_for_txid.sh ADMIN_USER USER Meaning: ADMIN_USER grants USER permision to perform transational write to topics
- delete_acl_for_txid.sh
The script “delete_acl_for_txid.sh” is for revoking the transactional write permission from a user.
% ./delete_acl_for_txid.sh Usage ./delete_acl_for_txid.sh ADMIN_USER USER
5. Category 3: Accessing Data
- produce.sh
% ./produce.sh jeff topic1 >In this example, the first argument is a producer user name. The second argument is the topic to which you will write. When you see the prompt “>,” you can start typing data. Enter each record (a key-value pair) on one line. Use the vertical bar “|” to separate the key and value, like the following example.
% ./produce.sh jeff topic1 >1001|this is the value of the first record >1002|this is the value of the second record >Both key and value are strings. Make sure there is no vertical bar in the key. It is ok if the value contains vertical bars. Duplicate keys will not cause errors or rejections. The superuser must grant the user (“jeff” in this example) the producer permissions, or this command will fail. See the later section about the “create_acl_for_producer.sh” script to learn how to grant the producer privilege. You can type “control-c” or “control-d” to stop the produce.sh script. If you need help with the “produce.sh” script, run it without argument, you will see a usage hint.
% ./produce.sh Usage: ./produce.sh PRODUCER_USER TOPICAll other scripts give you help messages in this way, too. There may be messages (warnings or errors) suppressed by the script. You may edit the script and comment out the following text:
2>/dev/nullThat is, put a “#” before the “2” and turn it to
#2>/dev/nullAfter that, the command will become more verbose. You can also modify the script to change the vertical bar delimiter to any other character.
- consume.sh
The “consume.sh” script allows you to read records from a Kafka topic. An example is as follows.
% ./consume.sh april t1 g0 1001|this is the value of the first record 1002|this is the value of the second record
The first argument (“april” in this example) is a consumer user name, and the second argument is a Kafka topic. The third argument is a consumer group name which is an arbitrary string.
The output contains one record per line with a vertical bar as the delimiter between key and value.
The superuser must grant the user (“april” in this example) the consumer permissions. See the later section about the “create_acl_for_consumer.sh” script to learn how to grant the consumer privilege.
Like “produce.sh,” you can also comment out the following text to enable verbose output.
2>/dev/null
You can also change the delimiter.
To stop the script, type “control-c.” Note that “control-d” does not work for the “consume.sh” script.
Continue with the above example. If you run the “consume.sh” script again, you will not see any records displayed. That is because the consumer group (“g0” in this example) has read the first two records, and the Kafka system has recorded this fact using a “consumer group offset.”
Keep the “consume.sh” running. In another window, send one more record to the Kafka topic:
% ./produce.sh jeff t1 >1003|this is the value of the third record
You will see the “consume.sh” receives this new record almost instantly,
% ./consume.sh april t1 g0 1003|this is the value of the third record
Now stop the “consume.sh” and run it again with a new group name. You can see all records in the topic are retrieved. This is because the starting consumer group offset for a new group (“g1” in the following example) is zero.
% ./consume.sh april t1 g1 1001|this is the value of the first record 1002|this is the value of the second record 1003|this is the value of the third record
So you can reread the content of a topic multiple times using different consumer groups. There will be little overhead to Kafka to create a new consumer group because all it needs to do is record the offsets.
If you are a superuser of the Kafka system, you can check the consumer group offsets using the script “desc_consumer_group.sh.” You can also modify it using the script “reset_consumer_offset.sh.”
6. Category 4: Managing Reader Offsets
- list_reader_offsets.sh
The script “list_reader_offsets.sh” retrieves the reader offset from the Kafka metadata repository. The format of this script is as follows.
% ./list_reader_offsets.sh Usage: show_offsets.sh ADMIN_USER
Here is an example of interaction.
% ./list_reader_offsets.sh jdoe Key (85 bytes): ["expense-log-1",{"gcs_url":"gs://mybucket/testdata/binary/data1.bi","topic":"log1"}] Value (15 bytes): {"position":34} Timestamp: 1608999925993 Partition: 0 Offset: 0 Key (84 bytes): ["expense-log-2",{"gcs_url":"gs://mybucket/testdata/text/data2.csv","topic":"log2"}] Value (15 bytes): {"position":15} Timestamp: 1608999925993 Partition: 0 Offset: 0 % Reached end of topic connect-offsets [0] at offset 1 % Reached end of topic connect-offsets [1] at offset 0
Kafka-Connect keeps the above metadata about where the reader has progressed. Each reader has a unique key. With the key, you can find the current offset of the reader. For example, for the reader named “expense-log-1,” its key is
["expense-log-1",{"gcs_url":"gs://mybucket/testdata/binary/data1.bi","topic":"log1"}]
And according to the reader offset list, this reader has progressed to position 34.
- reset_reader_offset.sh
The script “set_reader_offset.sh” modifies the reader offset. Its format is as follows.
% ./set_reader_offset.sh Usage: ./set_reader_offset.sh ADMIN_USER PARTITION KEY VALUE
This script is important if you need to rewind a reader to read from an earlier offset.
For example, suppose you have deployed a reader to read a text file on Google Storage. Over some time, the file has grown to an unmanageable size; You want to truncate the file size to zero after archiving the current content. With this script, you can accomplish that goal. Just truncate the file, and reset the reader offset to zero with this command:
% ./set_reader_offset.sh jdoe 0 \ '["expense-log-1",{"gcs_url":"gs://mybucket/testdata/binary/data1.bi","topic":"log1"}]' \ '{"position":0}'
With this command, the admin user “jdoe” sets the key to a new value in the metadata partition 0. Note that both key and value are enclosed with single quotes.
7. Category 5: Managing Consumer Groups
- list_consumer_groups.sh
The “list_consumer_groups.sh” script allows you to list all consumer groups in the Kafka system, like the following example.
% ./list_consumer_groups.sh jdoe g0 g1
This command requires one argument: the admin user name.
- desc_consumer_group.sh
The script “desc_consumer_group.sh” displays details of a consumer group, like the following example.
% ./desc_consumer_group.sh jdoe g0 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID g0 t1 1 2 2 0 - - - g0 t1 0 2 2 0 - - -
This script requires two arguments: the admin user name and a consumer group name.
The most important stats in the output are the “CURRENT_OFFSET” and “LAG.” The current offset shows how may records the consumer has read successfully. And the lag is the number of records the consumer is still lagging behind. A large lag or a steadily increasing lag signal a huge problem in the Kafka system.
- set_consumer_offset.sh
The script “set_consumer_offset.sh” helps you point the consumer group offset to anywhere in the topic. The following is its format.
% ./set_consumer_offset.sh Usage: ./set_consumer_offset.sh ADMIN_USER TOPIC[:PARTITION] GROUP OFFSET
where PARTITION is optional.
Here is an example interaction.
% ./set_consumer_offset.sh jdoe t1:0 g0 1 GROUP TOPIC PARTITION NEW-OFFSET g0 t1 0 1
If the script is successful, the new offset information is displayed. Now you can check the stats again with the desc_consumer_group.sh script.
% ./desc_consumer_group.sh jdoe g0 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID g0 t1 1 2 2 0 - - - g0 t1 0 1 2 1 - - -
Note that in this example, consumer group “g0” now lags behind by one record. You can now run the consume.sh script to read an old record again.
This script is useful for correcting errors and reprocessing old data.
- delete_consumer_group.sh
The script “delete_consumer_group.sh” allows you to delete a consumer group. Its usage is as follows.
% ./delete_consumer_group.sh Usage: ./delete_consumer_group.sh ADMIN_USER CONSUMER_GROUP
Here is an example interaction.
% ./delete_consumer_group.sh jdoe g1 Deletion of requested consumer groups ('g1') was successful.
8. Category 6: Managing Kafka Connectors
- list_plugins.sh
The script “list_plugins.sh” gets the list of all plugins in the Kafka-Connect.
% ./list_plugins.sh Usage: ./list_plugins.sh ADMIN_USER KAFKA_CONNECT_IP
where KAFKA_CONNECT_IP is the IP address of Kafka-Connect service (which listens at port 8083). You can find the Kafka-Connect service IP from the project metadata or from the status of the Kafka infrastructure object.
Here is an example of interaction.
% ./list_plugins.sh jdoe 34.105.23.40 [ { "class": "com.dcs.kafka.sink_connectors.apitarget.ApiTargetSinkConnector", "type": "sink", "version": "2.8.0" }, { "class": "com.dcs.kafka.sink_connectors.bq.BqSinkConnector", "type": "sink", "version": "2.8.0" }, { "class": "com.dcs.kafka.sink_connectors.gcs.object.GcsRecordFileSinkConnector", "type": "sink", "version": "2.8.0" }, { ...
If you have uploaded custom connectors to the custom connector URL, Kafka-Connect will try to load them as plugins. Your plugins should appear in the above list if Kafka-Connect can load them successfully.
- list_connectors.sh
The script “list_connectors.sh” gets a list of currently active Kafka connectors. Its format is as follows.
% ./list_connectors.sh Usage: ./list_connectors.sh ADMIN_USER KAFKA_CONNECT_IP
where KAFKA_CONNECT_IP is the IP address of Kafka-Connect service (which listens at port 8083). You can find the Kafka-Connect service IP from the project metadata or from the status of the Kafka infrastructure object.
Here is an example of interaction.
% ./list_connectors.sh jdoe 34.105.23.40 [ "expense-log-1", "my_connector" ]
The return is in the form of an array of strings.
Calabash deploys readers and writers in Kafka-Connect as connectors. The list also includes your custom connectors.
- connector_status.sh
% ./connector_status.sh Usage: ./connector_status.sh ADMIN_USER KAFKA_CONNECT_IP CONNECTOR_NAMEwhere KAFKA_CONNECT_IP is the IP address of Kafka-Connect service (which listens at port 8083). You can find the Kafka-Connect service IP from the project metadata or from the status of the Kafka infrastructure object. Here is an example of interaction.
% ./connector_status.sh jdoe 34.105.23.40 my_connector { "name": "my_connector", "connector": { "state": "RUNNING", "worker_id": "34.105.23.40:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "34.105.23.40:8083" } ], "type": "source" }
- connector_config.sh
The script “connector_config.sh” show the configuration of a Kafka connector. Its format is as follows.
% ./connector_config.sh Usage: ./connector_config.sh ADMIN_USER KAFKA_CONNECT_IP CONNECTOR_NAME
where KAFKA_CONNECT_IP is the IP address of Kafka-Connect service (which listens at port 8083). You can find the Kafka-Connect service IP from the project metadata or from the status of the Kafka infrastructure object.
Here is an example of interaction.
% ./connector_config.sh jdoe 34.105.23.40 my_connector { "name": "my_connector", "config": { "connector.class": "com.dcs.kafka.source_connectors.gcs.text.GcsTextFileSourceConnector", "custom_delims": "\"\\r\\n\", \"\\n\"", "charset": "UTF-8", "skip_rows": "1", "batch_size": "20", "gcs_url": "gs://databucket5000/testdata/finance/log1", "delim_type": "Standard", "name": "my_connector", "topic": "log1" }, "tasks": [ { "connector": "my_connector", "task": 0 } ], "type": "source" }
- connector_pause.sh
% ./connector_pause.sh Usage: ./connector_pause.sh ADMIN_USER KAFKA_CONNECT_IP CONNECTOR_NAMEwhere KAFKA_CONNECT_IP is the IP address of Kafka-Connect service (which listens at port 8083). You can find the Kafka-Connect service IP from the project metadata or from the status of the Kafka infrastructure object. This command does not have return message if all goes well. After this command, you can check the connector status. It should be “PAUSED.”
% ./connector_status.sh jdoe 34.105.23.40 my_connector { "name": "my_connector", "connector": { "state": "PAUSED", "worker_id": "34.105.23.40:8083" }, "tasks": [ { "id": 0, "state": "PAUSED", "worker_id": "34.105.23.40:8083" } ], "type": "source" }
- connector_resume.sh
The script “connector_resume.sh” is for resuming a paused Kafka connector. Its format is as follows.
% ./connector_resume.sh Usage: ./connector_resume.sh ADMIN_USER KAFKA_CONNECT_IP CONNECTOR_NAME
where KAFKA_CONNECT_IP is the IP address of Kafka-Connect service (which listens at port 8083). You can find the Kafka-Connect service IP from the project metadata or from the status of the Kafka infrastructure object.
This command does not have return message if all goes well.
After this command, you can check the connector status. It should be “RUNNING.”
% ./connector_status.sh jdoe 34.105.23.40 my_connector { "name": "my_connector", "connector": { "state": "RUNNING", "worker_id": "34.105.23.40:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "34.105.23.40:8083" } ], "type": "source" }
- connector_restart.sh
The script “connector_restart.sh” is for restarting a Kafka connector. Its format is as follows.
% ./connector_restart.sh Usage: ./connector_restart.sh ADMIN_USER KAFKA_CONNECT_IP CONNECTOR_NAME
where KAFKA_CONNECT_IP is the IP address of Kafka-Connect service (which listens at port 8083). You can find the Kafka-Connect service IP from the project metadata or from the status of the Kafka infrastructure object.
This command does not have return message if all goes well.
- create_connector.sh
% ./create_connector.sh Usage: ./create_connector.sh ADMIN_USER KAFKA_CONNECT_IP CONNECTOR_SPECwhere KAFKA_CONNECT_IP is the IP address of Kafka-Connect service (which listens at port 8083). You can find the Kafka-Connect service IP from the project metadata or from the status of the Kafka infrastructure object. CONNECTOR_SPEC is a JSON object including name of the connector and its configuration. Here is an example create connector command:
% ./create_connector.sh jdoe 34.105.23.40 '{ "name": "my_connector", "config": { "connector.class": "com.mydomain.MyFileSourceConnector", "custom_delims": "\"\\r\\n\", \"\\n\"", "charset": "UTF-8", "skip_rows": "1", "batch_size": "20", "gcs_url": "gs://mybucket/testdata/finance/log1", "delim_type": "Standard", "name": "my_connector", "topic": "log1" } }'The command echo the configuration back if all goes well. Using “create_connector.sh” to manually create connector is for statring your custom connector. For Calabash reader and writers, use Calabash CLI to deploy and undeploy them. Do not use “create_connector.sh.”
- set_connector_config.sh
The script “set_connector_config.sh” is for modifying connector config. Its format is as follows.
% ./set_connector_config.sh Usage: ./set_connector_config.sh ADMIN_USER KAFKA_CONNECT_IP CONNECTOR_NAME CONFIG
where KAFKA_CONNECT_IP is the IP address of Kafka-Connect service (which listens at port 8083). You can find the Kafka-Connect service IP from the project metadata or from the status of the Kafka infrastructure object.
Here is an example:
% ./set_connector_config.sh jdoe 34.105.23.40 my_connector '{ "connector.class": "com.mydomain.MyFileSourceConnector", "custom_delims": "\"\\r\\n\", \"\\n\"", "charset": "UTF-8", "skip_rows": "1", "batch_size": "20", "gcs_url": "gs://mybucket/testdata/finance2/log1", "delim_type": "Standard", "name": "my_connector", "topic": "log1" }'
The command echo the configuration back if all goes well. The configuration change takes effect immediately.
- delete_connectors.sh
The script “delete_connectors.sh” is for deleting a connector. Its format is as follows.
% ./delete_connector.sh Usage: ./delete_connector.sh ADMIN_USER KAFKA_CONNECT_IP CONNECTOR_NAME
Note that if a connector is a Calabash reader or writer, do not use this script to delete it. Instead, use Calabash CLI to undeploy the object. The Calabash CLI will take care of cleaning up the cloud resources, including the removal of the connectors.