Spring R2DBC + MySQL
Development/Database

Spring R2DBC + MySQL

반응형

Spring R2DBC

Spring 에서 사용되는 JDBC Datasource는 항상 Blocking 방식으로 제공되어졌다.

한 Query를 수행하고 그에 대한 결과를 받기까지는 항상 Blocking이 되어 해당 쓰레드가 대기하는 문제점이 존재한다.

Reactive Framework가 주로 사용되어 진지 꽤 지났음에도 불구하고 Spring에서 사용하는 Datasource는 아직도 예전의 Servlet의 쓰레드 모델에 멈춰있다는게 항상 답답함을 지울 수 없는 부분이었는데, 인큐베이팅 되던 모듈이 드디어 1.0.0 버전으로 릴리즈 되면서 Spring을 사용하면서도 Async-Nonblokcing Datasource를 함께 사용할 수 있다는 뜻이 된다.

이는 곧, Spring 5 Feature인 Webflux에서 성능적인 측면/Latency의 측면에서 꽤 큰 이점을 볼 수 있는 부분이 되겠다.

WebFlux를 사용해도 Spring을 사용하는 주된 사용자들은 DB와의 연결을 걸어놓았을 텐데, DB가 Blocking 인 경우 결국 Non-Blocking의 이점을 하나도 못얻기 때문에 R2DBC의 이점은 크다고 말할 수 있다

R2DBC는 다음과 같은 뜻을 내포하고 있다고 한다

Reactive Relational Database Connectivity

Features

  • R2DBC 드라이버는 @Configuration 를 통해 Java-based 설정을 가능하게 한다(기존 Datasource 형태로 @Bean등록)
  • DatabaseClient 는 POJO 형태로 Object Relation Mapping(ORM) 을 제공하며, 생산성 향상에 도움을 준다
  • Exception 발생 시, Spring 에서 제공되는 Runtime Exception으로 자동 변환해줌
    • 기존에 제공되던 DataAccessException ... 등과 같은
  • Spring 컨벤션에 사용되는 여러가지 Rich한 특성을 제공한다
  • 어노테이션 기반의 메타데이터를 매핑 가능하며, 확장이 가능하도록 한다
  • 커스텀 쿼리(@Query 등과 같이) 를 포함한 Repository의 쿼리들(CRUD)을 자동으로 R2DBC 방식으로 구현해준다
  • 현재 구현된 RDB
    • MySQL의 경우 github인걸로 보아, 아직 Pivotal팀에서 제공되는 공식 드라이버는 없는 모양임
    • Postgres (io.r2dbc:r2dbc-postgresql)
    • H2 (io.r2dbc:r2dbc-h2)
    • Microsoft SQL Server (io.r2dbc:r2dbc-mssql)
    • MySQL (com.github.mirromutth:r2dbc-mysql)
    • jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)

Initialize

아래의 디펜던시를 pom.xml 에 넣어주도록 하자

혹은, Spring Initializer 를 사용해서 R2DBC(Experimental) 을 골라서 프로젝트를 세팅해도 된다(이게 더 편함)

필자는 Spring Boot(Reactive) + R2DBC + MySQL Driver +

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>io.r2dbc</groupId>
      <artifactId>r2dbc-bom</artifactId>
      <version>${r2dbc-releasetrain.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependencies>

  <!-- other dependency elements omitted -->

  <dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-r2dbc</artifactId>
    <version>1.0.0.BUILD-SNAPSHOT</version>
  </dependency>

  <!-- a R2DBC driver -->
  <dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-h2</artifactId>
    <version>Arabba-RELEASE</version>
  </dependency>

</dependencies>

그리고 application.properties 에 아래 프로퍼티를 넣어서 쿼리를 디버깅해보자.

logging.level.org.springframework.data.r2dbc=DEBUG

R2DBC용 데이터베이스 프로퍼티 설정을 해준다(DB는 위에 나열된 것 중 아무거나 세팅하고 접속.)

테스트 용이라면 H2가 가장 편리하긴 하다(필자는 MySQL 테스트를 위해 MySQL로 설정.)

원래라면 익숙한.. jdbc:mysql:// ... 이 아닌 r2dbc:mysql: ... 으로 시작한다.

spring.r2dbc.url=r2dbc:mysql://localhost:3306/some_db?useUnicode=true&characterEncoding=utf8
spring.r2dbc.username=gompang
spring.r2dbc.password=gompang

까지 설정하게 되면 R2DBC로 DB연결을 완료할 수 있다.

Sample Code

기존에 테스트로 만들어놨던 데이터를 불러와보자. (DB 형태별로 상이하겠지만, 그냥 필자기준에서 테스트코드이다)

User 테이블을 불러올 예정이다

@Data
public class User {
    private String username;
    private String password;
    private String authorities;
    private boolean locked;
    private String ip;
    private String user_agent;
    private Date creation;
    private Date last_access;
}

Repository 를 하나 만들어준다

@Repository
public interface UserRepository extends R2dbcRepository<User, String> {
}

Repository를 갖다 쓰는 Service를 또 하나 만들어주자

@Service
@Slf4j
public class UserService {

    private final UserRepository userRepository;

    public UserService(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public void test() {
        Flux<User> allUser = userRepository.findAll();
        List<User> blocked = allUser.collectList().block();
        blocked.forEach(item -> log.info("user : {}", item));
    }
}

하는일이 너무 간단하긴 한데, repository에서 모든 유저 데이터를 찾은 뒤, for-each로 하나씩 찍는부분이다.

단, 여기서 주목할 점이 findAll()할 경우 기존에 Optional<List<User>> 혹은 List<User> 로 넘어오던 부분이 Flux 로 변경이 되었다(!)

이 말은 즉슨, WebFlux로 해당 데이터를 바로 내려줄 수도 있다는 뜻이며, Flux가 제공하는 Rich한 기능들을 다 사용할 수 있다는 말이다.

위 로직상에서는 block을 해서 결과를 다 기다리지만, 실제 내부 로직상 동작은 Async/Non-Blocking하게 데이터들을 가져오도록 구현이 되어 있다.

기왕 보는김에 SimpleR2dbcRepositoryfindAll()에 대해서 탐구해보자

public class SimpleR2dbcRepository<T, ID> implements ReactiveCrudRepository<T, ID> {
  ...
  public Flux<T> findAll() {
    return this.databaseClient.select().from(this.entity.getJavaType()).fetch().all();
  }
}

databaseClient 는 현재 Spring property에 값을 주어서 자동으로 Auto-configurated되었는데, 원래는 아래와 같이 생성된다.

ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");

DatabaseClient client = DatabaseClient.create(connectionFactory);

테스트로 H2 DB를 연결하는 예제이고, ConnectionFactory 기반으로 DatabaseClient 를 생성하게 된다

내부적인 method들이 다 Mono 혹은 Flux로 이루어져 있어서, 해당 기능들을 Wrapping해서 Repository로 제공해주도록 되어 있다.

Performance

그러면 이게 실제로 JDBC에 비해 성능적으로 이점을 봐야 좋은것 아니겠는가.

기존 JDBC는 쿼리마다 Thread를 만들어서 수행해야하기 때문에 자원이 비효율적이라, 그것을 DBCP로 풀어낸 것인데(Tomcat, Hikari ... 등)

Reactive 하다는 것은 Async한 Task들을 EventLoop방식으로 Queue에 쌓아두고, Thread를 보다 효율적으로 사용하는 것으로 자원성능 둘다 강점이 있다.

지금은 자원에 대한 부분보다 성능 이 얼마나 강점이 있나를 한번 확인해보자(자원같은 경우는 로컬PC에서 테스트하기엔 좀 어려운 부분이 있으니..)

측정 기준 : 소요시간 / TPS(초당 처리건수)

Prepare

10만건 Row를 랜덤으로 넣어버릴 것이다.

saveAll()이 Flux이기 때문에 바로 종료되고 작업이 진행이 안되므로, block을 시켜서 다 들어가는지 확인한다.

라고 하여 성능테스를 진행하려 했으나, 진행중 문제가 발생한다.

    @Test
    @Rollback(value = false)
    void put() {
        List<User> users = new ArrayList<>();
        log.info("Start");
        IntStream.range(0, 100000).forEach(i -> {
            User user = new User();
            user.setUsername(UUID.randomUUID().toString() + System.currentTimeMillis());
            user.setPassword(UUID.randomUUID().toString() + System.currentTimeMillis());
            user.setLocked(false);
            users.add(user);
        });
        log.info("End");
        Flux<User> userFlux = userRepository.saveAll(users);
        userFlux.collectList().block();
    }

에러가 발생한다

java.lang.IllegalStateException: Required identifier property not found for class dev.gompang.r2dbcsample.domains.User!
@Data
public class User {
    @Id
    private String username;
    private String password;
    private String authorities;
    private boolean locked;
    private String ip;
    private String user_agent;
    private Date creation;
    private Date last_access;
}

PK에 @Id 어노테이션을 붙여준다

그러면 위의 에러는 없어지는데, 이번에는 다른 문제가 발생한다.

이상하게 Insert를 안하고 Update 를 수행하다 에러가 난다.

org.springframework.dao.TransientDataAccessResourceException: Failed to update table [user]. Row with Id [8e68bb90-f947-4028-b448-2217572dd8b41575798663957] does not exist.

save() 를 뜯어보자.

return this.entity.isNew(objectToSave) ? this.databaseClient.insert().into(this.entity.getJavaType()).table(this.entity.getTableName()).using(objectToSave).map(this.converter.populateIdIfNecessary(objectToSave)).first().defaultIfEmpty(objectToSave) : this.databaseClient.update().table(this.entity.getJavaType()).table(this.entity.getTableName()).using(objectToSave).fetch().rowsUpdated().handle((rowsUpdated, sink) -> {
  ...
}

this.entity.isNew(objectToSave) 를 가지고 새로 생성되는 오브젝트인지, Update할 오브젝트인지 확인을 하는 모양이다.

PersistentEntityInformation에 구현이 되있는 isNew()를 보자.

public boolean isNew(T entity) {
        return this.persistentEntity.isNew(entity);
    }

그럼 persistentEntity를 봐야하겠다. 또 들어가보자

public class BasicPersistentEntity<T, P extends PersistentProperty<P>> implements MutablePersistentEntity<T, P> {
  ...
    public boolean isNew(Object bean) {
        this.verifyBeanType(bean);
        return ((IsNewStrategy)this.isNewStrategy.get()).isNew(bean);
    }
}

그럼 PersistentEntityIsNewStrategy의 isNew()가 있다(언제까지 들어갈거냐)

    public boolean isNew(Object entity) {
        Object value = this.valueLookup.apply(entity);
        if (value == null) {
            return true;
        } else if (this.valueType != null && !this.valueType.isPrimitive()) {
            return value == null;
        } else if (Number.class.isInstance(value)) {
            return ((Number)value).longValue() == 0L;
        } else {
            throw new IllegalArgumentException(String.format("Could not determine whether %s is new! Unsupported identifier or version property!", entity));
        }
    }

이쯤이면 정신이 몽롱해진다.

어찌된 영문인지 isNew() 에서 false가 나오니까 UPDATE를 수행하려고 하는 것 같다.

왜?

궁금해서 R2DBC 깃헙 이슈로 올려서 확인해보고자 했다.

질문한 이슈

 

TransientDataAccessResourceException(Failed to update table) issue · Issue #275 · spring-projects/spring-data-r2dbc

Hi. I've some problem when I using R2DBC Spring boot version org.springframework.boot spring-boot-starter-parent

github.com

답변 :

Because you've provided the @Id. The library needs to figure out, whether the row is new or whether it should exist. See #232 and #253 for further reference.

 

@Id 를 줬기 때문이고, 이는 라이브러리에서 새로 생성되었는지 아니면 기존에 있었던건지를 파악해야 하기 때문이다. 라고 했다.

 

사실 여기까지만 해도 잘 이해가 안갔는데, 결국 원인이라고 밝혀진건, 위의 #232랑 #253번 링크에서 밝혀졌다.

이미, 나와 같은 문제를 겪고 있던 사람들이 있었고 #253번 이슈를 참고해보면 답변이 아래와 같이 달려있다.

 

Thanks for report. The mentioned exception is signaled when a save(…) determines that the object is not new and if the UPDATE statement executes without updating a row. We added this behavior with #232 if an UPDATE returns zero rows were affected by a change to avoid updates that do not exist on the database (e.g. object with an assigned Id is considered not new which is a common bug when someone assumes the row should be inserted).

We haven't considered that MySQL has slightly different behavior by default. The MySQL docs say:

For UPDATE statements, the affected-rows value by default is the number of rows actually changed. If you specify the CLIENT_FOUND_ROWS flag to mysql_real_connect() when connecting to mysqld, the affected-rows value is the number of rows “found”; that is, matched by the WHERE clause.

 

요약 : MySQL 드라이버에서 해당 기능을 지원하는지 확인해봐라. R2DBC에서 Insert와 Update를 판단하는 방법은, UDPATE를 했을 때 zero rows 가 return 되면(즉, 업데이트 할 게 없다라는 뜻) Update를 수행하고 아니면 Insert를 하는 방식으로 구현이 되어 있던 것이다.

Found RowAffected row는 틀리다. 검색된 데이터와, 실제 영향 받는 데이터가 틀리기 때문에 이러한 세부적인 구현에 문제가 있었던 것이다.

문제가 되는 소스코드를 확인해보자. (링크)

public final class Capabilities {
  // 다른 소스코드는 중략

//    public static final int FOUND_ROWS = 2; // Return found rows instead of affected rows, should not enable this.

FOUND_ROWS 라는 필드가 아예 주석처리 되어있다. 심지어 should not enable this 라고 적혀있기 까지..

이게 현재 사용했던 0.8.0.RELEASE버전에서 구현이 저렇게 되어 있었다..

그렇다면.. master branch 는 어떨까 해서 확인해보니

/**
 * Values for the capabilities flag bitmask used by the MySQL Client/Server Protocol.
 */
public final class Capabilities {
  // 다른 소스코드는 중략
    /**
     * Use found/touched rows instead of changed rows for affected rows, should enable this.
     */
    private static final int FOUND_ROWS = 2;

예? 주석처리 풀고, should enable this 라는 설명까지(?) 붙여있다.

found/affected rows에 대한 기능을 지원하기 위해 준비중인 것으로 보인다.

그럼, 실제 master branch 를 fork해서(뭔가 점점 산으로 가고 있다.) FOUND_ROWS 의 주석이 풀린버전을 사용해본다면 문제가 해결되는지 궁금해지기 시작했다.

github에서 repo 를 fork 한 뒤, 최신버전 기준(0.8.1) 을 clone한다.

이후, mvn clean package 로 해당 프로젝트를 빌드해서 jar파일을 프로젝트에 추가한다.

자 이제 실행시켜보자

org.springframework.dao.TransientDataAccessResourceException: Failed to update table [user]. Row with Id [a0c8f013-13c8-4543-a71f-6e31b78794441579092477804] does not exist.

여전하다.

아까의 문제에서 아래와 같이 언급을 했는데, 저 상수를 enable하면 뭔가 될것같이 말했는데, 저 상수만 enable하는것이 아닌 R2DBC에서 또 옵션을 연동하는 작업을 해줘야 하나보다.

It should be enabled by default if this will affect Spring Data R2DBC behavior. I will change it soon.

2019/12/9에 will change it soon 이라고 말했으니.. 좀 더 기다려봐야 할 것 같다.

UPDATE(2020/11/30)

오랜만에 다시 dependency version up 후 다시 시도를 해보았다.

Repository

public interface UserRepository extends R2dbcRepository<User, Integer> {
}

User Domain

@Data
public class User {
    @org.springframework.data.annotation.Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column
    private String name;
}

으로 테스트해봤을때

동작이 잘 되는것을 알 수 있다.

이로써 insert.. 같은 기본적인 operation에 대해 동작은 하는것으로 보인다.

변경점이 꽤 많았을 듯 한데

Spring Initializer에서 MySQL + JPA + R2DBC를 선택할 시 아래와 같은 디펜던시를 설정해준다

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>dev.miku</groupId>
            <artifactId>r2dbc-mysql</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

dev.miku의 mysql 드라이버를 기본적으로 사용하게..(공식 드라이버는 아니지만, 공식사이트에서 지원하는 비공식 드라이버.. 같은거랄까) 되어있다.

0.8.0.RC2 이전 버전으로 테스트중이었는데,

0.8.0.RC2 버전에서 "Fixed capabilities flag with flags mask for remove all unknown flags" 라는 버그 수정항목이 있다.

위에서 언급한 Capabilities 클래스의 필드들이 수정된 것으로 보이며, Failed Update... 의 에러는 이 버전 이후로 수정되지 않았을까 싶다.

위와 같이 디펜던시를 선택하면, 문제없이 잘 동작한다.

다만, 도중에 r2dbc가 jpa랑은 다른 spring data 로 변경이 된 듯 한데

@Id의 어노테이션 사용 시 java.persistence의 @Id 말고, springframework.data.annotation.Id 이걸 사용해야 한다

그렇지 않으면 위에서 나왔던 Required identifier property not found for class 에러를 볼 수 있다.

P.S

findAll() 과 같은, SELECT 류 쿼리는 문제없이 잘 동작한다.

원래 의도였던 JDBC와의 성능비교를 하려던 참에 무산이 되어 허무하긴한데, 그래도 아직 개발중인 단계에서 이런 부분 삽질을 하게되어 공유하고자 포스팅을 해본다.

R2DBC와 MySQL을 사용하는 부분은 아직 지양해야 할 것 같다.

정규 RELEASE가 나오게 되면 그때 다시 연동해보자.

P.S 2

자매품으로 jasync-sql 이라는 라이브러리도 개발중인데,실제 Native Driver가 아닌 Wrapped Driver라서 실제 Spring R2DBC에서 사용하는 ConnectionFactory 를 지원해주지 않는 듯 하다(잠깐 맛보기로 봤을 땐..)

DatabaseClient를 직접 사용해서 쿼리를 작성해야 하는데, 이런 경우는 처음에 사용했던 mirromutth/r2dbc-mysql 와 별반 차이가 없다. 왜냐면 mirromutth/r2dbc-mysql 도 DatabaseClient 를 직접 사용하면 insert를 명시적으로 함수호출하여 사용할 수 있기 때문이다.

R2DBC에서 지원하는 드라이버들의 집합은 아래 사이트에서 확인가능하다.
https://r2dbc.io/

  • r2dbc-mysql - Native driver implemented for MySQL.
  • jasync-sql - R2DBC wrapper for Java & Kotlin Async Database Driver for MySQL and PostgreSQL written in Kotlin.
반응형

'Development > Database' 카테고리의 다른 글

[NoSQL] Aerospike란?  (1) 2018.03.20