[SOLVED] Upgrading Apache Kafka 2.7 to Java 11 Changes authenticationID sent to ZooKeeper Enabling Only 1 Kafka Broker to r/w znodes

The title of this post is a bit of mouthful and requires a bit more explanation.

I am running a pure open-source version of Kafka (currently running 2.7) and am using SASL/GSSAPI connections between all of the brokers and ZooKeeper. Currently, the whole system, including ZooKeeper, is running Java 8 and it is long-overdue to be upgraded to Java 11.

Upgrading Kafka to Java 11 causes the server to send an incorrect authenticationID String to ZooKeeper which results in the ACLs on the znodes being set to the hostname of the first Kafka server that connects to ZooKeeper. This results in only one of the Kafka hosts being able to r/w the znodes.

Prior to upgrading the following are the logs from one of the ZooKeeper nodes:

2021-05-17 14:47:35,345 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /172.19.65.22:41698
2021-05-17 14:47:35,357 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@868] - Client attempting to establish new session at /172.19.65.22:41698
2021-05-17 14:47:35,367 [myid:1] - INFO  [CommitProcessor:1:ZooKeeperServer@617] - Established session 0x1797ac46a0f0000 with negotiated timeout 18000 for client /172.19.65.22:41698
2021-05-17 14:47:35,470 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:SaslServerCallbackHandler@118] - Successfully authenticated client: authenticationID=kafka/vmwqsrdvk01.dv.quasar.nadops.net@EXAMPLE.NET;  authorizationID=kafka/vmwqsrdvk01.dv.quasar.nadops.net@EXAMPLE.NET.
2021-05-17 14:47:35,474 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:SaslServerCallbackHandler@134] - Setting authorizedID: kafka
2021-05-17 14:47:35,474 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@964] - adding SASL authorization for authorizationID: kafka

The following is the corresponding logs from the Kafka host making the connection:

[2021-05-17 14:47:37,512] INFO Successfully authenticated client: authenticationID=kafka/vmwqsrdvk01.dv.quasar.nadops.net@EXAMPLE.NET; authorizationID=kafka/vmwqsrdvk01.dv.quasar.nadops.net@EXAMPLE.NET. (org.apache.kafka.common.secur
ity.authenticator.SaslServerCallbackHandler)

The most important part of this is that ZooKeeper sees the authenticationID as kafka/vmwqsrdvk01.dv.quasar.nadops.net@EXAMPLE.NET. ZooKeeper is configured with the following options so that the host and realm will be removed from the authenticationID. This will leave the service name, ‘kafka‘, and result in all of the Kakfa znodes with kafka as the owner of the node. This then enables all of the hosts in the Kafka cluster r/w access to those znodes. Following are the configurations in ZooKeeper that enable this behavior.

kerberos.removeHostFromPrincipal=true
kerberos.removeRealmFromPrincipal=true

The resulting znodes ACLs are

[zk: localhost:2181(CONNECTED) 5] getAcl /brokers
'world,'anyone
: r
'sasl,'kafka
: cdrwa

System Configurations

Following are all of the configurations for both the ZooKeeper and Kafka cluster:

ZooKeeper

zoo.cfg

maxClientCnxns=50
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/data_dir
dataLogDir=/zk/data_log_dir
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
clientPort=2181
kerberos.removeHostFromPrincipal=true
kerberos.removeRealmFromPrincipal=true

server.1=zk2-01.dv.quasar.nadops.net:2888:3888
server.2=zk2-02.dv.quasar.nadops.net:2888:3888
server.3=zk2-03.dv.quasar.nadops.net:2888:3888

jaas.conf

Server {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/etc/zookeeper/conf/vmwqsrdvz201.dv.quasar.nadops.net.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/vmwqsrdvz201.dv.quasar.nadops.net@EXAMPLE.NET";
};

Kafka

broker-jaas.conf

KafkaServer {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/usr/local/kafka/config/vmwqsrdvk01.dv.quasar.nadops.net.keytab"
  storeKey=true
  principal="kafka/vmwqsrdvk01.dv.quasar.nadops.net@EXAMPLE.NET";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/usr/local/kafka/config/vmwqsrdvk01.dv.quasar.nadops.net.keytab"
  storeKey=true
  serviceName="zookeeper"
  principal="kafka/vmwqsrdvk01.dv.quasar.nadops.net@EXAMPLE.NET";
};

server.properties

Only the relevant configs are included below.

listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,BROKER_LISTENER:SASL_SSL

inter.broker.listener.name=BROKER_LISTENER
listeners=SASL_SSL://:9092,BROKER_LISTENER://:9093

ssl.keystore.location=/usr/local/kafka/config/kafka01.dv.quasar.nadops.net-keystore.jks
ssl.keystore.password=***
ssl.key.password=***
ssl.truststore.location=/usr/local/kafka/config/kafka01.dv.quasar.nadops.net-truststore.jks
ssl.truststore.password=***
sasl.kerberos.service.name=kafka
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
zookeeper.set.acl=true
super.users=User:kafka
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false

Upgrading Kafka to Java 11

Once Kafka (leaving ZooKeeper on Java 8) is upgraded to Java 11 Kafka no longer sends the expected authorizationID String to ZooKeeper.

Following are the ZooKeeper logs of the same authentication:

2021-05-18 14:31:01,627 [myid:3] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /172.19.65.23:53374
2021-05-18 14:31:01,631 [myid:3] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@868] - Client attempting to establish new session at /172.19.65.23:53374
2021-05-18 14:31:01,636 [myid:3] - INFO  [CommitProcessor:3:ZooKeeperServer@617] - Established session 0x3797fd27e300004 with negotiated timeout 18000 for client /172.19.65.23:53374
2021-05-18 14:31:01,728 [myid:3] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:SaslServerCallbackHandler@118] - Successfully authenticated client: authenticationID=vmwqsrdvk02@EXAMPLE.NET;  authorizationID=kafka/vmwqsrdvk02.dv.quasar.nadops.net@EXAMPLE.NET.
2021-05-18 14:31:01,728 [myid:3] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:SaslServerCallbackHandler@134] - Setting authorizedID: vmwqsrdvk02
2021-05-18 14:31:01,728 [myid:3] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@964] - adding SASL authorization for authorizationID: vmwqsrdvk02

Following are the logs from Kafka that results in an exception being thrown because it is unable to write to the ZooKeeper znode:

[2021-05-18 14:31:01,410] INFO Initiating client connection, connectString=vmwqsrdvz201.dv.quasar.nadops.net:2181,vmwqsrdvz202.dv.quasar.nadops.net:2181,vmwqsrdvz203.dv.quasar.nadops.net:2181 sessionTimeout=18000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@39b43d60 (org.apache.zookeeper.ZooKeeper)
[2021-05-18 14:31:01,414] INFO jute.maxbuffer value is 4194304 Bytes (org.apache.zookeeper.ClientCnxnSocket)
[2021-05-18 14:31:01,419] INFO zookeeper.request.timeout value is 0. feature enabled= (org.apache.zookeeper.ClientCnxn)
[2021-05-18 14:31:01,421] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-05-18 14:31:01,556] INFO Client successfully logged in. (org.apache.zookeeper.Login)
[2021-05-18 14:31:01,573] INFO TGT refresh thread started. (org.apache.zookeeper.Login)
[2021-05-18 14:31:01,577] INFO Client will use GSSAPI as SASL mechanism. (org.apache.zookeeper.client.ZooKeeperSaslClient)
[2021-05-18 14:31:01,604] INFO TGT valid starting at:        Tue May 18 14:31:01 UTC 2021 (org.apache.zookeeper.Login)
[2021-05-18 14:31:01,604] INFO TGT expires:                  Wed May 19 00:31:01 UTC 2021 (org.apache.zookeeper.Login)
[2021-05-18 14:31:01,605] INFO TGT refresh sleeping until: Tue May 18 22:32:58 UTC 2021 (org.apache.zookeeper.Login)
[2021-05-18 14:31:01,606] INFO Opening socket connection to server vmwqsrdvz203.dv.quasar.nadops.net/172.19.65.21:2181. Will attempt to SASL-authenticate using Login Context section 'Client' (org.apache.zookeeper.ClientCnxn)
[2021-05-18 14:31:01,614] INFO Socket connection established, initiating session, client: /172.19.65.23:53374, server: vmwqsrdvz203.dv.quasar.nadops.net/172.19.65.21:2181 (org.apache.zookeeper.ClientCnxn)
[2021-05-18 14:31:01,622] INFO Session establishment complete on server vmwqsrdvz203.dv.quasar.nadops.net/172.19.65.21:2181, sessionid = 0x3797fd27e300004, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2021-05-18 14:31:01,626] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2021-05-18 14:31:01,731] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /brokers/ids
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:120)
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
        at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:564)
        at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1662)
        at kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1560)
        at kafka.zk.KafkaZkClient.$anonfun$createTopLevelPaths$1(KafkaZkClient.scala:1552)
        at kafka.zk.KafkaZkClient.$anonfun$createTopLevelPaths$1$adapted(KafkaZkClient.scala:1552)
        at scala.collection.immutable.List.foreach(List.scala:333)
        at kafka.zk.KafkaZkClient.createTopLevelPaths(KafkaZkClient.scala:1552)
        at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:467)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:233)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
[2021-05-18 14:31:01,733] INFO shutting down (kafka.server.KafkaServer)

The authorizationID now provided by Kafka is now vmwqsrdvk02@EXAMPLE.NET, only the hostname and realm, and no longer the String that ZooKeeper expects. The resulting, derived authorizationID is vmwqsrdvk02 which means that only that Kafka host has r/w to any of the znodes which it creates.

Solution

After exhausting search options and finding multiple pages indicating that I have everything configured correctly, even for JDK 11, I posted to the #apache-kafka channel on chat.freenode.net and got a hint. Evidently, there are some JDKs that have bugs in their Kerberos implementation.

I was running java-11-openjdk-11.0.8.10-1.el7.x86_64 from CentOS 7 Updates. I double-checked and there was an updated JDK available.

After updating to the latest Java 11 JDK everything worked as expected. If the particular JDK that you are using is still not working try AdoptOpenJDK.

Baseline Settings for a VirtualBox Instance for a GUI under Debian 10

The following is a list of the basic VirtualBox settings to start with when using a VM with a GUI. Make sure that you have also installed the Oracle VM VirtualBox Extension Pack that is compatible with your version of VirtualBox and that you have installed the Guest Additions in the VM itself.

  • System
    • Motherboard
      • 4+GB of RAM, or whatever you have available
    • Processor
      • 2+ CPU (if you have the available cores/threads)
  • Display
    • As much Video Memory as you can spare
    • Graphics Controller: VMSVGA
      • Ensure that 3D Acceleration is enabled

message=class configured for SSLContext: sun.security.ssl.SSLContextImpl$TLSContext not a SSLContext When Mocking Static Methods in Class

When mocking static classes with Junit4, Mockito and PowerMock, you may see the following log messages after annotating your test class if the code that you are testing is making HTTP connections:

message=class configured for SSLContext: sun.security.ssl.SSLContextImpl$TLSContext not a SSLContext

Your annotations for the class (or method) typically include the following:

@RunWith(PowerMockRunner.class)
@PrepareForTest({ SomeClassYouWantToMock.class })

This may cause some confusion, especially if whatever other code that you may have ONLY uses HTTP. Add the following to your annotations to tell PowerMock loading of the following classes.

@PowerMockIgnore({ "javax.net.ssl.*" })

This should prevent PowerMock from loading different class definitions that are used by your HTTP library.

Creating a Counter or Progress Bar for a Python Program

I’ve written a number of Python apps where I would like it to print some sort of counter or progress bar to STDOUT to let me know that it is still running instead of locked up or died somehow without me being able to see it.

I had tried using a couple of different existing progress bar related modules but none of them really worked except in a very specific use case.

So, after a bit of futzing around I came up with a very simple way to print out a updating counter to STDOUT. The synchronization code was gleaned from this post here, thank you Daniel.

Following is a working example for Python 3.7. You could write your own implementation of the progress function to render whatever you want to STDOUT.

import threading
import sys
import time

def synchronized(func):
    func.__lock__ = threading.Lock()
    def synced_func(*args, **kws):
        with func.__lock__:
            return func(*args, **kws)
    return synced_func

total = 0

@synchronized
def progress(count):
    global total
    total += count
    sys.stdout.write(f'\rtotal: [{total}]')
    sys.stdout.flush()

for i in range(500):
    progress(1)
    time.sleep(0.01)

print(f'\nFinished...')

Running this will generate the following output with the total value dynamically updating as the application runs:

(progressbar) rchapin@leviathan:progressbar$ python progressbar.py 
total: [500]
Finished...

How To Spy and Verify a Static Void Method in Java

The Mockito and PowerMockito libraries for JUnit4 are not always the most intuitive.

Following is an example of how to spy and verify a static void method.

    @Test
    public void testAdd() {

        // Prepare the Utils class to be spied.
        PowerMockito.spy(Utils.class);

        // Run the test and get the actual value from the OUT
        int actualValue = App.add("Test1", 1, 1);

        /*
         * To verify the number of times that we called Utils.doSomething we
         * first need to tell the PowerMockito library which class we are
         * verifying and how many times we are verifying that action.
         */
        PowerMockito.verifyStatic(Utils.class, Mockito.times(1));

        /*
         * Then, and this is not at all intuitive, we have to call the method
         * ourselves with the same parameters that we are expecting to have been
         * called. This tells PowerMockito which method invocation is to be
         * verified.
         */
        Utils.doSomething(Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt());

        assertEquals(2, actualValue);
    }

The complete example can be found here.

Configuring rsyslog to rotate log files from log messages streamed to it from a Systemd service

In general, I have moved to writing all of my applications to write their log output to STDOUT. This makes running them on the command line, in an IDE, on a bare metal box, VM, or in a container completely decoupled from how you store and view the logs. No more having multiple logging configs for each flavor of deployment.

In this particular case, I am running an application in a container (but it isn’t necessary that it is in a container) controlled by systemd and using rsyslog to forward all of the log messages to a specific output file. A requirement of writing log files to a local disk is that you must be able to rotate and truncate them by size so that you don’t fill up your disk; in either normal operation or some error condition that ends up inadvertently generating a large amount of log messages in a short period of time.

For the following example, we will us the service identifier my_program_identifier. You will update this to define something relevant to your deployment.

To configure your service in this manner you first need to add the appropriate options to the [Service] section of your unit file.

StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=my_program_identifier

Then you define an rsyslog.d config file as follows for my_program.conf

$outchannel my_program_log_rotation,/var/log/my_program/my_program.log, 1073741824, /etc/my_program/log-rotate.sh

if $programname == "my_program_identifier" then :omfile:$my_program_log_rotation
& stop

In the rsyslog conf file we define an Output Channel. The TLDR; is that an output channel enables you to define the file name to which you want to write, the max size (in bytes) and a command (or path to a script or program) to run when the file reaches the limit.

In the previous example, we declare an output channel with the $outchannel directive. We then give it the identifier my_program_log_rotation. Then define the path of the log file, the max size, and a shell script that will run to rotate the file for us.

The next line defines how to act upon each of the log messages with the "my_program_identifier" that we defined in the unit file.

Following is a working sample of the log-rotate.sh script.

#!/bin/bash
  
LOG_DIR=/var/log/my_program
FILE_NAME=my_program.log
MAX_NUM_FILES=10

for i in `ls -1 $LOG_DIR/${FILE_NAME}.* | sort --field-separator=. -k3 -nr`
do
  # Grab the number (last token) for all of the numbered files
  log_num=$(echo "$i" | awk -F\. '{print $NF}')

  # If it is equal to or greater than our max number of files
  # just delete it.
  if [ "$log_num" -ge $MAX_NUM_FILES ]
  then
    rm $i
    continue
  fi

  target_num=$((log_num + 1))
  target_file_name="$LOG_DIR/${FILE_NAME}.${target_num}"
  mv -f $i $target_file_name
done

mv -f $LOG_DIR/$FILE_NAME $LOG_DIR/${FILE_NAME}.1

Deploy your updated unit file, your rsyslog.d conf file, and the shell script and you should have it up and running.

[SOLVED] Unable to Sign-In to Gmail with Thunderbird with OAuth2, Keeps Asking for Email or Phone Over and Over

If you are setting up Thunderbird to use your Gmail account you may find that when Thunderbird opens a new window to a Google web portal into which you are to provide your email address and password that it will keep asking you over and over again for your email and never enable to you to enter the password.

This occurs when Thunderbird’s privacy settings do not allow it to store cookies.

First, ensure that your gmail account has Allow insecure apps off. Unfortunately, it may take some time for this setting to propagate to your account.

Than, go to Preferences > Privacy and under Web Content check the Accept cookies from sites checkbox.

Return to your account settings when when prompted you should now be able to enter your credentials to grant Thunderbird access to your account.

Mocking an HTTPS RESTful endpoint with Netcat

Netcat is generally known as a TCP/IP Swiss Army Knife and is incredibly helpful for both debugging and mocking up network services

Following is an example on how to setup a mock RESTful service that communicates over HTTPS.

On the “server” side, run the following command.  The -l command instructs Netcat to listen.

while true; do { echo -e “HTTP/1.1 200 OK\r\n$(date)\r\n\r\n<h1>hello world from $(hostname) on $(date)</h1>” |  nc -vl –ssl 8080; } done

On the “client” side, run the following to PUT a sample json document.

curl https://localhost:8080/foo/blah -k -XPUT -d @sample.json

Alternatively, you can also generate a key cert pair to use if you have to test importing of certs

To do so, first generate a self-signed cert and an ssl key without a passphrase for your nc “server”.  Place the server.key and server.cert file in /var/tmp/server-cert

openssl req -nodes -new -x509 -keyout server.key -out server.cert

Then run nc as follows:

while true; do { echo -e “HTTP/1.1 200 OK\r\n$(date)\r\n\r\n<h1>hello world from $(hostname) on $(date)</h1>” |  nc -vl –ssl 8080 –ssl-key /var/tmp/server-cert/server.key –ssl-cert /var/tmp/server-cert/server.cert; } done