Ik had hetzelfde probleem, niet zeker of je een oplossing hebt gevonden of niet, maar ik kon iets soortgelijks bereiken door het volgende te doen. Eerst heb ik trigger aan mijn tabel toegevoegd
CREATE TRIGGER trigger_name
AFTER INSERT OR DELETE OR UPDATE
ON table_name
FOR EACH ROW
EXECUTE PROCEDURE trigger_function_name;
Hiermee wordt een trigger voor de tabel ingesteld wanneer een rij wordt bijgewerkt, verwijderd of ingevoegd. Dan roept het de triggerfunctie aan die ik heb ingesteld en die er ongeveer zo uitzag:
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS
$BODY$
DECLARE
payload JSON;
BEGIN
payload = row_to_json(NEW);
PERFORM pg_notify('notification_name', payload::text);
RETURN NULL;
END;
$BODY$;
Hierdoor kan ik 'luisteren' naar al deze updates van mijn Spring Boot-project en het zal de hele rij als een payload verzenden. Vervolgens heb ik in mijn Spring Boot-project een verbinding met mijn db geconfigureerd.
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("host")
.database("db")
.port(port)
.username("username")
.password("password")
.schema("schema")
.connectTimeout(Duration.ofMinutes(2))
.build());
}
}
Daarmee autowire ik het (injectie met afhankelijkheid) in de constructor in mijn serviceklasse en cast het als volgt naar een r2dbc PostgressqlConnection-klasse:
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();
Nu willen we naar onze tafel 'luisteren' en een melding krijgen wanneer we een update aan onze tafel uitvoeren. Om dat te doen, hebben we een initialisatiemethode opgezet die wordt uitgevoerd na afhankelijkheidsinjectie met behulp van de @PostContruct-annotatie
@PostConstruct
private void postConstruct() {
postgresqlConnection.createStatement("LISTEN notification_name").execute()
.flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}
Merk op dat we luisteren naar de naam die we in de pg_notify-methode plaatsen. We willen ook een methode opzetten om de verbinding te sluiten wanneer de boon op het punt staat weggegooid te worden, zoals:
@PreDestroy
private void preDestroy() {
postgresqlConnection.close().subscribe();
}
Nu maak ik eenvoudig een methode die een Flux retourneert van wat er momenteel in mijn tabel staat, en ik voeg het ook samen met mijn meldingen, zoals ik al zei voordat de meldingen binnenkomen als een json, dus ik moest het deserialiseren en ik besloot om te gebruiken ObjectMapper. Het ziet er dus ongeveer zo uit:
private Flux<YourClass> getUpdatedRows() {
return postgresqlConnection.getNotifications().map(notification -> {
try {
//deserialize json
return objectMapper.readValue(notification.getParameter(), YourClass.class);
} catch (IOException e) {
//handle exception
}
});
}
public Flux<YourClass> getDocuments() {
return documentRepository.findAll().share().concatWith(getUpdatedRows());
}
Ik hoop dat dit helpt. Proost!