Category: RxJava

Vision Statement

NoSQL databases are dominating enterprise systems. One reason is that they do not require precise data schema and can deal with new data without triggering massive changes. The idea is to achieve schema flexibility with the typical SQL database like PostgreSQL or MySql. In general, NoSQL databases require some key/index/primary fields to enable fast lookups. If we preserve the concept of necessary fields and store the rest of the document in the JSON text field we can accomplish similar flexibility with the plain old SQL database.

Use Case

Imagine some users table where fields like username and email are required to identify a user, but user attributes like phone, lastName, firstName can be just stored as part of the user record.

Implementation Details

Here is the database schema for the above use case using H2 Java in-memory database:

drop table if exists users;
create table users
(
    id       bigint       NOT NULL auto_increment,
    username varchar(50)  not null,
    password varchar(255) not null,
    enabled  boolean      not null,
    email    varchar(255) not null,
    object   varchar(4048)         NOT NULL DEFAULT '{}',
    CONSTRAINT pk_users PRIMARY KEY (id)
);

In this example, application key fields defined as typical SQL schema and the rest of the document is stored in the object field. Now, all we need to do is merge key fields into a final object. I used ebean ORM library to implement database connectivity and statement execution and command/reactive design pattern explained in my RxJava post to implement DAO layer.

Model

itzap-ebeans project implements this idea and contains an example UserDao implementation. Simple user model looks like this:

@JsonDeserialize(builder = User.Builder.class)
public class User extends Auditable {
    private final String lastName;
    private final String firstName;
    private final String email;
    private final String phone;

    @JsonProperty("enabled")
    private final Boolean enabled;
    private final String username;

    @JsonIgnore
    private final String password;

    public User(Builder builder) {
        super(builder);

        this.lastName = builder.lastName;
        this.firstName = builder.firstName;
        this.email = builder.email;
        this.phone = builder.phone;
        this.enabled = builder.enabled;
        this.username = builder.username;
        this.password = builder.password;
    }

    public String getLastName() {
        return lastName;
    }

    public String getFirstName() {
        return firstName;
    }

    public String getEmail() {
        return email;
    }

    public String getPhone() {
        return phone;
    }

    public Boolean getEnabled() {
        return enabled;
    }

    public String getUsername() {
        return username;
    }
  
    public String getPassword() {
        return password;
    }

    @Override
    public String toString() {
        return MoreObjects.toStringHelper(this.getClass())
                .add("lastName", lastName)
                .add("email", email)
                .add("username", username)
                .toString();
    }

    @JsonPOJOBuilder(withPrefix = "set")
    @JsonIgnoreProperties(ignoreUnknown = true)
    public static class Builder extends Auditable.Builder<User, User.Builder> {
        private String lastName;
        private String firstName;
        private String email;
        private String phone;
        private Boolean enabled;
        private String username;
        private String password;

        public Builder() {
        }

        @Override
        protected Builder getThis() {
            return this;
        }

        public Builder setLastName(String lastName) {
            this.lastName = lastName;
            return this;
        }

        public Builder setFirstName(String firstName) {
            this.firstName = firstName;
            return this;
        }

        public Builder setEmail(String email) {
            this.email = email;
            return this;
        }

        public Builder setPhone(String phone) {
            this.phone = phone;
            return this;
        }

        public Builder setEnabled(Boolean enabled) {
            this.enabled = enabled;
            return this;
        }

        public Builder setUsername(String username) {
            this.username = username;
            return this;
        }

        public Builder setPassword(String password) {
            this.password = password;
            return this;
        }

        @Override
        public Builder merge(User org) {
            Builder bld = super.merge(org)
                    .setEmail(StringUtils.defaultIfBlank(this.email, org.getEmail()))
                    .setEnabled(ObjectUtils.defaultIfNull(this.enabled, org.getEnabled()))
                    .setLastName(StringUtils.defaultIfBlank(this.lastName, org.getLastName()))
                    .setUsername(StringUtils.defaultIfBlank(this.username, org.getUsername()))
                    .setFirstName(StringUtils.defaultIfBlank(this.firstName, org.getFirstName()))
                    .setPassword(StringUtils.defaultIfBlank(this.password, org.getPassword()))
                    .setPhone(StringUtils.defaultIfBlank(this.phone, org.getPhone()));

            return bld;
        }

        @Override
        public User build() {
            return new User(this);
        }
    }

    public static Builder from(User user) {
        Builder builder = new Builder();
        BeanUtils.copyProperties(user, builder);
        return builder;
    }

    public static Builder builder() {
        return new Builder();
    }
}

Note all the Json annotations that enable serialization/deserialization of the model and merge method that provides rules for merging database and Json objects.

Dao Implementation

Dao layer implemented using two base commands: ListBaseEbeanCommand and BaseBulkEbeanCommand. ListBaseEbeanCommand is the base command for all select operations and BaseBulkEbeanCommand is the base command for all Insert/Update/Delete operations. Base commands operate using EbeanHandler and report exceptions using EbeanErrorHandler. Complete DAO implementation looks like this:

public class UserDao extends AbstractServiceDao {
    private static final String SELECT_ALL_USERS = "select * from users";
    private static final String WHERE_USER = " where username=:username";
    private static final String WHERE_USER_ID = " where id=:id";

    private static final String ADD_USER = "INSERT INTO public.users(" +
            "            username, password, enabled, email, object)" +
            "    VALUES (:username, :password, :enabled, :email, :object)";
    private static final String UPDATE_USER = "UPDATE public.users " +
            "   SET username=:username, password=:password, enabled=:enabled, email=:email, object=:object" +
            " WHERE id=:id";
    private static final String DELETE_USER = "DELETE FROM users WHERE id=:id";

    public UserDao(Servers.Server server) {
        super(server);
    }

    public Observable<User> getUsers() {
        return new SelectUsersCommand()
                .toObservable()
                .flatMap(Observable::fromIterable);
    }

    public Observable<User> getUserByName(String name) {
        return new SelectUsersCommand()
                .setName(name)
                .toObservable()
                .map(users -> users.get(0));
    }

    public Observable<User> getUser(Long userId) {
        return new SelectUsersCommand()
                .setUserId(userId)
                .toObservable()
                .map(users -> users.get(0));
    }

    public Observable<Integer> addUser(User user) {
        return new BaseBulkEbeanCommand<BaseBulkEbeanCommand>("cmd-addUser-cmd",
                new UserHandler(user)) {
            @Override
            protected BaseBulkEbeanCommand getThis() {
                return this;
            }

            @Override
            protected SqlUpdate createSql() {
                return server.getServer().createSqlUpdate(ADD_USER);
            }
        }.toObservable();
    }

    public Observable<Integer> updateUser(Long userId, User user) {
        return new SelectUsersCommand()
                .setName(user.getUsername())
                .toObservable()
                .flatMap(orgUser -> new BaseBulkEbeanCommand<BaseBulkEbeanCommand>("cmd-updateUser-cmd",
                        new UserHandler(userId, user, orgUser.get(0))) {
                    @Override
                    protected BaseBulkEbeanCommand getThis() {
                        return this;
                    }

                    @Override
                    protected SqlUpdate createSql() {
                        return server.getServer().createSqlUpdate(UPDATE_USER);
                    }
                }.toObservable());
    }

    public Observable<Integer> deleteUser(Long id) {
        return new BaseBulkEbeanCommand<BaseBulkEbeanCommand>("cmd-deleteUser-cmd",
                new DeleteEbeanHandler<>(QueryParameter.id(id))) {
            @Override
            protected BaseBulkEbeanCommand getThis() {
                return this;
            }

            @Override
            protected SqlUpdate createSql() {
                return server.getServer().createSqlUpdate(DELETE_USER);
            }
        }.toObservable();
    }

    class UserHandler extends AbstractAuditableHandler<User, User.Builder> {
        UserHandler(User user) {
            this(null, user, User.builder().build());
        }

        UserHandler(Long userId, User user, User dbUser) {
            super(userId, user, dbUser);
        }

        @Override
        protected User.Builder rowProperties(SqlUpdate updateQuery, User user) {
            updateQuery.setParameter("username",
                    StringUtils.defaultIfBlank(user.getUsername(), orgObject.getUsername()));

            String pass = getPassword(user);
            if (StringUtils.isBlank(pass)) {
                updateQuery.setParameter("password", orgObject.getPassword());
            } else {
                updateQuery.setParameter("password", pass);
            }
            updateQuery.setParameter("enabled", ObjectUtils.defaultIfNull(user.getEnabled(),
                    orgObject.getEnabled()));
            updateQuery.setParameter("email",
                    StringUtils.defaultIfBlank(user.getEmail(), orgObject.getEmail()));

            return audibalBuilder(user,  User.from(user));
        }
    }

    private static String getPassword(User user) {
        return user.getPassword();
    }

    private static  User.Builder fromRow(SqlRow row, User user) {
        return User.from(user)
                .setUsername(row.getString("username"))
                .setEmail(row.getString("email"))
                .setEnabled(row.getBoolean("enabled"))
                .setPassword(row.getString("password"));
    }

    class SelectUsersCommand extends ListBaseEbeanCommand<User, SelectUsersCommand> {
        private String name;
        private Long userId;

        public SelectUsersCommand setName(String name) {
            this.name = name;
            return getThis();
        }

        public SelectUsersCommand setUserId(Long userId) {
            this.userId = userId;
            return this;
        }

        SelectUsersCommand() {
            super("cmd-get-users",
                    new AbstractSelectListEbeanHandler<User, User.Builder>(User.class) {
                        @Override
                        protected User.Builder from(SqlRow row, User user) {
                            return fromRow(row, user);
                        }
                    });
        }

        @Override
        protected SelectUsersCommand getThis() {
            return this;
        }

        @Override
        protected SqlQuery createSql() {
            String sql = SELECT_ALL_USERS;
            if (StringUtils.isNotBlank(this.name)) {
                sql = sql + WHERE_USER;
            } else if (this.userId != null) {
                sql = sql + WHERE_USER_ID;
            }

            SqlQuery qry = server.getServer().createSqlQuery(sql);
            if (StringUtils.isNotBlank(this.name)) {
                qry.setParameter("username", this.name);
            } else if (this.userId != null) {
                qry.setParameter("id", this.userId);
            }
            return qry;
        }
    }
}

Readme

itzap-ebeans

itzap-ebeans provides an easy to use library for building reactive DAO with flexible schema models. Visit my ITZap blog to read more about this project.

How To Build

  • Clone the following projects:
    • git clone git@github.com:avinokurov/itzap-parent.git
    • git clone git@github.com:avinokurov/itzap-common.git
    • git clone git@github.com:avinokurov/itzap-rxjava.git
    • git clone git@github.com:avinokurov/itzap-ebeans.git
  • Build all projects
    • cd itzap-parent && mvn clean install
    • cd ../itzap-common && mvn clean install
    • cd ../itzap-rxjava && mvn clean install
    • cd ../itzap-ebeans && mvn clean install
  • Example
    • itzap-beans project contains sample UserDao implementation

Code

Complete implementation can be found here:

Vision Statement

Build 7.3.x elasticsearch embedded server into itzapelasticsearch library to enable easy prototyping and unit/integration testing

How It Works

Elasticsearch embedded server starts a single elasticsearch Node. Elasticsearch server node will be listening for incoming requests on port 9200 Elasticsearch client can be configured to access embedded server using the following url: http://localhost:9200 Elasticsearch embedded server will create data and home temp folders that will be deleted opon application close.

Code

Here is the main class that implements elasticsearch embedded server node

public class EmbeddedElasticSearchServer implements IOData {
  private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedElasticSearchServer.class);

  private Node instance;
  private int port;
  private final AnyConfig config;

  public EmbeddedElasticSearchServer(AnyConfig config) {
    this.config = config;
  }

  private static class PluginConfigurableNode extends Node {
    PluginConfigurableNode(Settings input,
                           Map<String, String> properties,
                           Path configPath,
                           Supplier<String> defaultNodeName,
                           Collection<Class<? extends Plugin>> classpathPlugins) {
      super(InternalSettingsPreparer.prepareEnvironment(input, properties, configPath, defaultNodeName), classpathPlugins, false);
    }
  }

  @Override
  public synchronized Completable start() {
    return new RunnableCommand<Void>("cmd-start") {
      @Override
      protected Void run() {
        Settings settings = getSettings();

        instance = new PluginConfigurableNode(settings, ImmutableMap.of(),
                                              null, () -> config.getString(EsConfig.CLUSTER_NAME),
                                              singletonList(Netty4Plugin.class));
        try {
          instance.start();
          Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
              if (instance != null) {
                instance.close();
              }
            } catch (IOException e) {
              LOGGER.error("Error closing ElasticSearch");
            }
          }));
          LOGGER.info("ElasticSearch cluster {} started in local mode on port {}", instance.settings().get("cluster.name"),
                      port);
          return null;
        } catch (NodeValidationException e) {
          throw new IZapException("Failed to start embedded elastic search server", e);
        }
      }
    }.toCompletable();
  }

  @Override
  public synchronized Completable stop() {
    return new RunnableCommand<Void>("cmd-stop") {
      @Override
      protected Void run() {
        if (instance != null && !instance.isClosed()) {
          LOGGER.info("Stopping Elastic Search");
          try {
            instance.close();
            instance = null;
            LOGGER.info("Elastic Search on port {} stopped", port);
          } catch (IOException e) {
            throw new IZapException("Failed to close elastic search embedded server", e);
          }
        }

        return null;
      }
    }.toCompletable();
  }

  private Settings getSettings() {
    String clusterName = config.getString(EsConfig.CLUSTER_NAME);
    String host = config.getString(EsConfig.HOST);
    port = config.getInt(EsConfig.PORT);

    try {
      File dataDir = Files.createTempDirectory(clusterName + "_" + System.currentTimeMillis() + "data").toFile();
      FileUtils.forceDeleteOnExit(dataDir);
      cleanDataDir(dataDir.getAbsolutePath());

      File homeDir = Files.createTempDirectory(clusterName + "_" + System.currentTimeMillis() + "-home").toFile();
      cleanDataDir(homeDir.getAbsolutePath());
      FileUtils.forceDeleteOnExit(homeDir);

      Settings.Builder settingsBuilder = Settings.builder()
        .put("cluster.name", clusterName)
        .put("http.host", host)
        .put("http.port", port)
        .put("transport.tcp.port", port + 100)
        .put(EsConfig.DATA_PATH.getName(), dataDir.getAbsolutePath())
        .put(EsConfig.HOME_PATH.getName(), homeDir.getAbsolutePath())
        .put("http.cors.enabled", true)
        .put("node.data", true)
        .put("http.type", "netty4")
        .put("transport.type", "netty4");

      return settingsBuilder.build();
    } catch (IOException e) {
      throw new IZapException("Failed to create temp data/home dir.", e);
    }
  }
}

Elasticsearch embedded server implementation is using RunnableCommand command pattern to implement start and stopmethods. Please note that RunnableCommand returns Completable and not Observable. Sice Observable<Void> is no longer permitted in RxJava2 and start/stop methods have no return values. Here is the unit test for the elasticsearch embedded server

public class EmbeddedElasticSearchServerTest {
    private EmbeddedElasticSearchServer server;

    @Before
    public void setup() {
        AnyConfig config = ConfigBuilder.builder(ConfigType.TYPE_SAFE)
                .setFileName(this.getClass()
                        .getResource("/es-config.properties").getFile())
                .build();

        server = new EmbeddedElasticSearchServer(config);
    }

    @Test
    public void startTest() {
        server.start().blockingGet();
        server.stop().blockingGet();
    }
}

Elasticsearch dependencies

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>elasticsearch-rest-high-level-client</artifactId>
  <version>7.3.1</version>
</dependency>

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch</artifactId>
  <version>7.3.1</version>
</dependency>

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>transport</artifactId>
  <version>7.3.1</version>
</dependency>

Command Pattern

Command pattern can be extended using RxJava to create a robust application code. This idea is an extension of the NetflixObservableCommand.

Vision Statement

Create a lightweight extensible library that implements a command pattern with RxJava extension. This library should provide an easy way to create commands and turning them into Reactive Observables. Commands will wrap exception handling and provide execution time metric for tracability.

Use Case

In Service-Oriented Programming, it is a common need to combine results of one service execution with the result of another service execution. This is a good fit for the command pattern. However, it would be great to be able to execute services in parallel whenever possible. Creating a reactive application service layer makes a robust and efficient solution for business logic implementation.

Other Solutions

Already mentioned NetflixObservableCommand can be used to implement reactive services.

Why itzap-rxjava

itzaprxjava provides and easy way to turn any executable code into RxJava observable. For example

public Observable<Pair[]> read() { return new RunnableCommand<Pair[]>("cmd-read-flatFile") { @Override protected Pair[] run() { // do some processing return new Pair[0]; } }.toObservable(); }
Code language: PHP (php)

In this example, file processing code is wrapped into RxJava Observable with ease. In addition, itzap-rxjava will provide logging statements that would measure execution time.

Code