JDBC Streams

“Simplicity is prerequisite for reliability.”

Edsger W. Dijkstra

In mancher Java Anwendung befinden sich Legacy Code Einschlüsse um JDBC Anfragen, die nicht mehr zeitgemäß erscheinen. Häufig wünscht sich der Entwickler, den recht plumpen Code durch eine elegante Stream Variante zu ersetzen.

In dem Beitrag JPA Tabellen mit reservierten Nummernkreisen wurden IDs aus einer Tabelle ausgelesen, um den größten gefundenen Wert als Basis für zukünftige IDs zu nutzen. Der ursprüngliche Code arbeitet mit einer while-Schleife, die über ein

<code>ResultSet</code>
ResultSet iteriert.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
try {
PreparedStatement st = session.getJdbcCoordinator().getStatementPreparer().prepareStatement(sql);
ResultSet rs = session.getJdbcCoordinator().getResultSetReturn().extract(st);
try {
long value = 0;
long next;
while (rs.next()) {
next = rs.getLong(1);
if (legacies.contains(next)) {
log.info("ignored: {}", next);
continue;
}
log.info("next: {}", next);
value = Math.max(next, value);
}
previousValueHolder.initialize(value).increment();
sql = null;
log.info("First free id: {}", previousValueHolder.makeValue());
} finally {
session.getJdbcCoordinator().getLogicalConnection().getResourceRegistry().release(rs, st);
session.getJdbcCoordinator().getLogicalConnection().getResourceRegistry().release(st);
session.getJdbcCoordinator().afterStatementExecution();
}
} catch (SQLException sqle) {
throw session.getJdbcServices().getSqlExceptionHelper().convert(sqle,
"could not fetch initial value for increment generator", sql);
}
try { PreparedStatement st = session.getJdbcCoordinator().getStatementPreparer().prepareStatement(sql); ResultSet rs = session.getJdbcCoordinator().getResultSetReturn().extract(st); try { long value = 0; long next; while (rs.next()) { next = rs.getLong(1); if (legacies.contains(next)) { log.info("ignored: {}", next); continue; } log.info("next: {}", next); value = Math.max(next, value); } previousValueHolder.initialize(value).increment(); sql = null; log.info("First free id: {}", previousValueHolder.makeValue()); } finally { session.getJdbcCoordinator().getLogicalConnection().getResourceRegistry().release(rs, st); session.getJdbcCoordinator().getLogicalConnection().getResourceRegistry().release(st); session.getJdbcCoordinator().afterStatementExecution(); } } catch (SQLException sqle) { throw session.getJdbcServices().getSqlExceptionHelper().convert(sqle, "could not fetch initial value for increment generator", sql); }
try {
  PreparedStatement st = session.getJdbcCoordinator().getStatementPreparer().prepareStatement(sql);
  ResultSet rs = session.getJdbcCoordinator().getResultSetReturn().extract(st);
  try {
    long value = 0;
    long next;
    while (rs.next()) {
      next = rs.getLong(1);
      if (legacies.contains(next)) {
        log.info("ignored: {}", next);
        continue;
      }
      log.info("next: {}", next);
      value = Math.max(next, value);
    }
    previousValueHolder.initialize(value).increment();
    sql = null;
    log.info("First free id: {}", previousValueHolder.makeValue());
  } finally {
    session.getJdbcCoordinator().getLogicalConnection().getResourceRegistry().release(rs, st);
    session.getJdbcCoordinator().getLogicalConnection().getResourceRegistry().release(st);
    session.getJdbcCoordinator().afterStatementExecution();
  }
} catch (SQLException sqle) {
  throw session.getJdbcServices().getSqlExceptionHelper().convert(sqle,
      "could not fetch initial value for increment generator", sql);
}

Um den Code zu verschlanken soll das Schleifen Konstrukt durch einen Stream ersetzt werden. Zusätzlich soll der Inhalt des finally-Blocks verschwinden.

Um einen

Stream
Stream zu verwenden, wird zuerst einmal eine Methode benötigt, die aus einem
ResultSet
ResultSet einen
Stream
Stream erzeugen kann.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
public static <T, E extends SQLException> Stream<T> queryAsStream(PreparedStatement stmt,
WrappedBiFunction<ResultSet, Long, T, E> mapper, Consumer<ResultSet> closer) throws SQLException {
ResultSet rs = stmt.executeQuery();
return StreamSupport.stream(new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED) {
private long index = 0;
@Override
public boolean tryAdvance(Consumer<? super T> action) {
try {
if (!rs.next()) {
return false;
}
action.accept(mapper.apply(rs, index++));
return true;
} catch (SQLException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}, false).onClose(() -> closer.accept(rs));
}
public static <T, E extends SQLException> Stream<T> queryAsStream(PreparedStatement stmt, WrappedBiFunction<ResultSet, Long, T, E> mapper, Consumer<ResultSet> closer) throws SQLException { ResultSet rs = stmt.executeQuery(); return StreamSupport.stream(new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED) { private long index = 0; @Override public boolean tryAdvance(Consumer<? super T> action) { try { if (!rs.next()) { return false; } action.accept(mapper.apply(rs, index++)); return true; } catch (SQLException e) { throw new RuntimeException(e.getMessage(), e); } } }, false).onClose(() -> closer.accept(rs)); }
public static <T, E extends SQLException> Stream<T> queryAsStream(PreparedStatement stmt,
    WrappedBiFunction<ResultSet, Long, T, E> mapper, Consumer<ResultSet> closer) throws SQLException {
  ResultSet rs = stmt.executeQuery();
  return StreamSupport.stream(new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED) {
    private long index = 0;

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
      try {
        if (!rs.next()) {
          return false;
        }
        action.accept(mapper.apply(rs, index++));
        return true;
      } catch (SQLException e) {
        throw new RuntimeException(e.getMessage(), e);
      }
    }
  }, false).onClose(() -> closer.accept(rs));
}

Das Herzstück der obigen

queryAsStream
queryAsStream Methode ist der
AbstractSpliterator
AbstractSpliterator, mit dem die
StreamSupport
StreamSupport Klasse einen
Stream
Stream erzeugt. Die
tryAdvance
tryAdvance Methode liefert false zurück, wenn es keine weiteren Resultate im
ResultSet
ResultSet existieren und
true
true, wenn das Ergebnis der Mapping-Methode auf das
ResultSet
ResultSet angewendet, an den
Consumer
Consumer aus den Aufrufparametern übergeben wurde.

Der zweite Parameter der stream Methode ist

false
false, damit kann dieser Stream nicht parallel verarbeitet werden. Im Falle sortierter SQL Ergebnisse sicherlich auch keine besonders gute Idee.

Die hier verwendete Mapping-Methode ist eine

WrappedBiFunction<ResultSet, Long, T, E>
WrappedBiFunction<ResultSet, Long, T, E>, weil der jeweilig Index des Ergebnisses als Long Parameter mitgegeben wird. In diesem Beispiel nicht notwendig, aber in anderen Anwendungsfällen nützlich. Eine weitere Besonderheit dieses funktionalen Interfaces ist der vierte Parameter
E
E der
Exception
Exception erweitert.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
interface WrappedBiFunction<R, L, T, E extends Exception> {
T apply(R r, L l) throws E;
}
interface WrappedBiFunction<R, L, T, E extends Exception> { T apply(R r, L l) throws E; }
interface WrappedBiFunction<R, L, T, E extends Exception> {
  T apply(R r, L l) throws E;
}

Auf diese Weise können Lambda Ausdrücke, die eine

SQLException
SQLException werfen, direkt an die
queryAsStream
queryAsStream Methode übergeben werden.

Zum Ende des Beitrag ist hier noch die veränderte Implementierung mit der

queryAsStream
queryAsStream Methode gezeigt.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
try (PreparedStatement st = session.getJdbcCoordinator().getStatementPreparer().prepareStatement(sql);
Stream<Long> stream = queryAsStream(st, (rs, l) -> rs.getLong(1), b -> close(session, st, b))) {
long value = stream.filter(not(legacies::contains)).max(naturalOrder()).orElse(0L);
log.info("value: {}", value);
previousValueHolder.initialize(value)).increment();
sql = null;
log.info("First free id: {}", previousValueHolder.makeValue());
} catch (SQLException sqle) {
throw session.getJdbcServices().getSqlExceptionHelper().convert(sqle,
"could not fetch initial value for increment generator", sql);
}
try (PreparedStatement st = session.getJdbcCoordinator().getStatementPreparer().prepareStatement(sql); Stream<Long> stream = queryAsStream(st, (rs, l) -> rs.getLong(1), b -> close(session, st, b))) { long value = stream.filter(not(legacies::contains)).max(naturalOrder()).orElse(0L); log.info("value: {}", value); previousValueHolder.initialize(value)).increment(); sql = null; log.info("First free id: {}", previousValueHolder.makeValue()); } catch (SQLException sqle) { throw session.getJdbcServices().getSqlExceptionHelper().convert(sqle, "could not fetch initial value for increment generator", sql); }
try (PreparedStatement st = session.getJdbcCoordinator().getStatementPreparer().prepareStatement(sql);
     Stream<Long> stream = queryAsStream(st, (rs, l) -> rs.getLong(1), b -> close(session, st, b))) {
  long value = stream.filter(not(legacies::contains)).max(naturalOrder()).orElse(0L);
  log.info("value: {}", value);
  previousValueHolder.initialize(value)).increment();
  sql = null;
  log.info("First free id: {}", previousValueHolder.makeValue());
} catch (SQLException sqle) {
  throw session.getJdbcServices().getSqlExceptionHelper().convert(sqle,
      "could not fetch initial value for increment generator", sql);
}

In einem try/resource-Block, wird die Methode

queryAsStream
queryAsStream aufgerufen und liefert einen Stream vom Typ
Long
Long. Der Typ ergibt sich aus der Mapping-Methode
(rs, l) -> rs.getLong(1)
(rs, l) -> rs.getLong(1) als zweiter Parameter. Statt einer Schleife mit zwei Variablen und einer bedingten Anweisung, werden im Stream einfach alle Legacy IDs ausgefiltert und der maximale Wert anhand der
naturalOrder
naturalOrder Methode bestimmt. Die Aufrufe aus dem finally -Block wurden in eine Methode
close
close verschoben und über einen
Consumer
Consumer als dritten Parameter von
queryAsStream
queryAsStream aufgerufen.

Diese Beispiel zeigt nicht nur, wie gut sich bestehende Anwendungsfälle durch Streams eleganter und damit einfacher und verständlicher formulieren lassen. Auch die Unterstützung der Stream API für die Integration eigenen Stream Quellen ist vorbildlich realisiert.