JDBC Streams

“Simplicity is prerequisite for reliability.”

Edsger W. Dijkstra

In mancher Java Anwendung befinden sich Legacy Code Einschlüsse um JDBC Anfragen, die nicht mehr zeitgemäß erscheinen. Häufig wünscht sich der Entwickler, den recht plumpen Code durch eine elegante Stream Variante zu ersetzen.

In dem Beitrag JPA Tabellen mit reservierten Nummernkreisen wurden IDs aus einer Tabelle ausgelesen, um den größten gefundenen Wert als Basis für zukünftige IDs zu nutzen. Der ursprüngliche Code arbeitet mit einer while-Schleife, die über ein ResultSet iteriert.

try {
  PreparedStatement st = session.getJdbcCoordinator().getStatementPreparer().prepareStatement(sql);
  ResultSet rs = session.getJdbcCoordinator().getResultSetReturn().extract(st);
  try {
    long value = 0;
    long next;
    while (rs.next()) {
      next = rs.getLong(1);
      if (legacies.contains(next)) {
        log.info("ignored: {}", next);
        continue;
      }
      log.info("next: {}", next);
      value = Math.max(next, value);
    }
    previousValueHolder.initialize(value).increment();
    sql = null;
    log.info("First free id: {}", previousValueHolder.makeValue());
  } finally {
    session.getJdbcCoordinator().getLogicalConnection().getResourceRegistry().release(rs, st);
    session.getJdbcCoordinator().getLogicalConnection().getResourceRegistry().release(st);
    session.getJdbcCoordinator().afterStatementExecution();
  }
} catch (SQLException sqle) {
  throw session.getJdbcServices().getSqlExceptionHelper().convert(sqle,
      "could not fetch initial value for increment generator", sql);
}

Um den Code zu verschlanken soll das Schleifen Konstrukt durch einen Stream ersetzt werden. Zusätzlich soll der Inhalt des finally-Blocks verschwinden.

Um einen Stream zu verwenden, wird zuerst einmal eine Methode benötigt, die aus einem ResultSet einen Stream erzeugen kann.

public static <T, E extends SQLException> Stream<T> queryAsStream(PreparedStatement stmt,
    WrappedBiFunction<ResultSet, Long, T, E> mapper, Consumer<ResultSet> closer) throws SQLException {
  ResultSet rs = stmt.executeQuery();
  return StreamSupport.stream(new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED) {
    private long index = 0;

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
      try {
        if (!rs.next()) {
          return false;
        }
        action.accept(mapper.apply(rs, index++));
        return true;
      } catch (SQLException e) {
        throw new RuntimeException(e.getMessage(), e);
      }
    }
  }, false).onClose(() -> closer.accept(rs));
}

Das Herzstück der obigen queryAsStream Methode ist der AbstractSpliterator, mit dem die StreamSupport Klasse einen Stream erzeugt. Die tryAdvance Methode liefert false zurück, wenn es keine weiteren Resultate im ResultSet existieren und true, wenn das Ergebnis der Mapping-Methode auf das ResultSet angewendet, an den Consumer aus den Aufrufparametern übergeben wurde.

Der zweite Parameter der stream Methode ist false, damit kann dieser Stream nicht parallel verarbeitet werden. Im Falle sortierter SQL Ergebnisse sicherlich auch keine besonders gute Idee.

Die hier verwendete Mapping-Methode ist eine WrappedBiFunction<ResultSet, Long, T, E>, weil der jeweilig Index des Ergebnisses als Long Parameter mitgegeben wird. In diesem Beispiel nicht notwendig, aber in anderen Anwendungsfällen nützlich. Eine weitere Besonderheit dieses funktionalen Interfaces ist der vierte Parameter E der Exception erweitert.

interface WrappedBiFunction<R, L, T, E extends Exception> {
  T apply(R r, L l) throws E;
}

Auf diese Weise können Lambda Ausdrücke, die eine SQLException werfen, direkt an die queryAsStream Methode übergeben werden.

Zum Ende des Beitrag ist hier noch die veränderte Implementierung mit der queryAsStream Methode gezeigt.

try (PreparedStatement st = session.getJdbcCoordinator().getStatementPreparer().prepareStatement(sql);
     Stream<Long> stream = queryAsStream(st, (rs, l) -> rs.getLong(1), b -> close(session, st, b))) {
  long value = stream.filter(not(legacies::contains)).max(naturalOrder()).orElse(0L);
  log.info("value: {}", value);
  previousValueHolder.initialize(value)).increment();
  sql = null;
  log.info("First free id: {}", previousValueHolder.makeValue());
} catch (SQLException sqle) {
  throw session.getJdbcServices().getSqlExceptionHelper().convert(sqle,
      "could not fetch initial value for increment generator", sql);
}

In einem try/resource-Block, wird die Methode queryAsStream aufgerufen und liefert einen Stream vom Typ Long. Der Typ ergibt sich aus der Mapping-Methode (rs, l) -> rs.getLong(1) als zweiter Parameter. Statt einer Schleife mit zwei Variablen und einer bedingten Anweisung, werden im Stream einfach alle Legacy IDs ausgefiltert und der maximale Wert anhand der naturalOrder Methode bestimmt. Die Aufrufe aus dem finally -Block wurden in eine Methode close verschoben und über einen Consumer als dritten Parameter von queryAsStream aufgerufen.

Diese Beispiel zeigt nicht nur, wie gut sich bestehende Anwendungsfälle durch Streams eleganter und damit einfacher und verständlicher formulieren lassen. Auch die Unterstützung der Stream API für die Integration eigenen Stream Quellen ist vorbildlich realisiert.