Home Integration Demo Details Architecture Project

Demo application

The demo application emulates a distributed scenario.

Checkout the source repository:

$ git checkout git@github.com:gcristian/minka.git
$ ls
drwxrwxr-x 2 cristian cristian 4,0K dic  9 21:59 doc
-rwxrwxr-x 1 cristian cristian   38 dic  9 02:32 go
-rw-rw-r-- 1 cristian cristian  12K dic  9 02:32 LICENSE
drwxrwxr-x 6 cristian cristian 4,0K dic  9 02:32 samples
drwxrwxr-x 3 cristian cristian 4,0K dic  9 02:32 server
drwxrwxr-x 5 cristian cristian 4,0K dic  9 02:32 spectator
-rw-rw-r-- 1 cristian cristian 2,3K dic  9 21:59 readme.md
-rw-rw-r-- 1 cristian cristian 4,5K dic  9 02:32 pom.xml

We have the core at ../server, and the demo application at ../samples
The demo app. is launched with the newsampler.sh script.

$ cd samples
$ ls

drwxrwxr-x 2 cristian cristian 4,0K dic  9 02:32 datasets
-rwxrwxr-x 1 cristian cristian  823 oct 30 17:12 sampler
drwxrwxr-x 4 cristian cristian 4,0K ago 21 20:22 src
drwxrwxr-x 7 cristian cristian 4,0K nov 26 21:50 target
-rwxrwxr-x 1 cristian cristian  853 dic  9 02:32 newsampler.sh
-rw-rw-r-- 1 cristian cristian 4,1K dic  9 02:32 pom.xml

$ cat newsampler.sh

#!/bin/bash
# ------------------- Minka Demo launcher --------------------
# scans for a non already used & busy port to configure minka 
# broker, starting at port 9000, takes the first one free
# ------------------------------------------------------------

port_start=9000
host='localhost'
xms='512M'
pp='broker.hostPort'
dsfp=$(pwd)'/'${1:-'dataset.properties'}
for i in {0..20}; do
    echo $dsfp
    i=$(($i+$port_start))
    x=`netstat -ltn | grep $i | grep -v grep`
    y=`ps aux | grep "$pp=$host:$i"| grep -v 'grep'`
    if [ -z "$x" ] && [ -z "$y" ]; then
        echo "Using port: $i"
        mvn -DXms$xms -DXmx$xms exec:java \ 
            -Dexec.mainClass=DatasetSampler.main -Dmins=1440 \
            -D$pp=$host:$i -Ddataset.filepath=$dsfp
        exit
    else
        echo "Port: $i busy (netstat got $x)"
    fi
done

Emulated scenario

The script basically tests available ports starting from 9000, and runs a Java process with a dataset file as a parameter.

Maven 3.x installed is required.

A Dataset file contains intructions about the stage we are simulating:

We’ll be using mix.properties which has an heterogeneous stage:

$ ls
-rw-rw-r-- 1 cristian cristian  331 dic  9 02:32 dataset-cluster1.properties
-rw-rw-r-- 1 cristian cristian  331 dic  9 02:32 dataset-cluster2.properties
-rw-rw-r-- 1 cristian cristian  351 dic  9 02:32 dataset.properties
-rw-rw-r-- 1 cristian cristian  155 dic  9 02:32 dataset-sizy.properties
-rw-rw-r-- 1 cristian cristian  177 dic  9 02:32 fairy.properties
-rw-rw-r-- 1 cristian cristian  740 dic  9 02:32 mix.properties
-rw-rw-r-- 1 cristian cristian 2,2K dic  9 02:32 sample.properties

$ cat datasets/mix.properties

duties.size = 1000
shards.ports = 9001;9002;9000
duties.pallets = Finwe:200:50~100:FAIR_WEIGHT; Ewok:500:500~1000:EVEN_WEIGHT; Cirith:100:1:EVEN_SIZE; Manwe:150:50~1000:FAIR_WEIGHT;
shards.capacities = 9001:Finwe:1000; 9001:Ewok:3000; 9001:Manwe:25000; 9002:Finwe:1500; 9002:Ewok:2000; 9002:Manwe:11000; 9000:Finwe:15000; 9000:Ewok:24000; 9000:Manwe:16000; 9003:Finwe:15000; 9003:Ewok:35500; 9003:Manwe:45000

I will explain some properties:

shards.ports = 9001;9002;9000

This dataset files requires to launch 3 shards, so we’ll be repeating the same command 3 times:

duties.pallets = Finwe:200:50~100:FAIR_WEIGHT; 

A pallet named Finwe will be grabbing 200 duties, weighing any random number 50~100, with the fair weight balancer

shards.capacities = 9001:Finwe:1000;

The shard that took the port 9001 first, will report a max weight capacity of 1000 for pallet Finwe…

Let’s go

Create logging output folders first:

$ sudo mkdir /var/log/minka
$ sudo chown cristian: /var/log/minka

Fire the same line in 3 different consoles so we can identify the output individually..

$ ./newsampler datasets/mix.properties

Some seconds after the proctor, balance and distribution phase runs: duties are distributed.
To check this we can request a JSON status to the leader shard.

By default the webserver’s port is 57480, and the broker’s port is 5748. In case we change the default broker’s port: Minka adds 100 to the broker’s port to resolve the webserver port, in this case broker takes port 9000, and the webserver of this shard will become 9100. Unless specifically configurated, the binding interfase is the same than the broker’s one.

This happens so there can be running several shards at the same machine, without colliding any minka port, like in this case the newsampler.sh script parametrizing ports in the range of 9000..90xx

So let’s find the leader shard first:

$ curl localhost:9100/minka/admin/proctor

{
  "leaderShardId": {
    "port": "9000",
    "sourceHost": "192.168.1.102",
    "id": "192.168.1.102:9000"
  },
  "shards": []
}

So now we know that the shard living in the port 9000 has become leader.
During its leadership we can ask them for the Status object.
For the sake of text space, I’m using a different dataset sampler file.

$ curl -svn localhost:9100/minka/admin/status | jq .

{
  "global": {
    "unstaged-size": "20",
    "staged-size": "80",
    "pallet-size": "1",
    "duty-size": "100",
    "shard-size": "2"
  },

the Status has information about the global distribution of duties, number of shards, duties distributed and duties without if there’re no enough weight capacity.
The pallets section shows the pallets loaded so far, and their distribution status

  "pallets": [
    {
      "balancer": "io.tilt.minka.core.leader.balancer.FairWeightBalancer",
      "balancer-metadata": {
        "dispersion": "EVEN",
        "presort": "DATE"
      },
      "unstaged-weight": "100.0",
      "id": "Fairy",
      "creation": "2016-12-19T02:15:53.331Z",
      "size": "100",
      "cluster-capacity": "400.0",
      "allocation": "80%",
      "staged-size": "80",
      "staged-weight": "400.0",
      "unstaged-size": "20"
    }
  ],

Status shows also the distributed shards, and which entities they have captured so far.
Duties reside in the duties string field, in the format: {id1:weight1, id2:weight2…}

  "shards": [
    {
      "pallets": [
        {
          "duties": "90:5.0,91:5.0,92:5.0,93:5.0,94:5.0,95:5.0,96:5.0,97:5.0,10:5.0,98:5.0,11:5.0,99:5.0,12:5.0,13:5.0,14:5.0,15:5.0,16:5.0,17:5.0,18:5.0,19:5.0,0:5.0,1:5.0,2:5.0,3:5.0,4:5.0,5:5.0,6:5.0,7:5.0,8:5.0,9:5.0,20:5.0,21:5.0,22:5.0,23:5.0,24:5.0,25:5.0,26:5.0,27:5.0,28:5.0,29:5.0,70:5.0,71:5.0,72:5.0,73:5.0,74:5.0,75:5.0,76:5.0,77:5.0,78:5.0,79:5.0,80:5.0,81:5.0,82:5.0,83:5.0,84:5.0,85:5.0,86:5.0,87:5.0,88:5.0,89:5.0,",
          "weight": "300.0",
          "capacity": "300.0",
          "size": "60",
          "id": "Fairy"
        }
      ],
      "status": "ONLINE",
      "creation": "2016-12-19T02:39:25.669Z",
      "host": "2.168.1.10:9001"
    },
    {
      "pallets": [
        {
          "duties": "60:5.0,50:5.0,61:5.0,51:5.0,62:5.0,52:5.0,63:5.0,53:5.0,64:5.0,54:5.0,65:5.0,55:5.0,66:5.0,56:5.0,67:5.0,57:5.0,68:5.0,58:5.0,69:5.0,59:5.0,",
          "weight": "100.0",
          "capacity": "100.0",
          "size": "20",
          "id": "Fairy"
        }
      ],
      "status": "ONLINE",
      "creation": "2016-12-19T02:15:42.316Z",
      "host": "2.168.1.10:9000"
    }
  ]
}

There’s also the configuration running the current shard:

$ curl -svn 192.168.1.102:9100/minka/admin/config | jq .

{
  "consistency": {
    "dutyStorage": "CLIENT_DEFINED"
  },
  "proctor": {
    "clusterHealthStabilityDelayPeriods": 1,
    "heartbeatMaxDistanceStandardDeviation": 4,
    "heartbeatLapseSec": 20,
    "heartbeatMaxBiggestDistanceFactor": 2.5,
    "startDelayMs": 500,
    "delayMs": 1000,
    "maxShardJoiningStateMs": 15000,
    "minHealthlyHeartbeatsForShardOnline": 2,
    "maxAbsentHeartbeatsBeforeShardGone": 5,
    "maxHeartbeatReceptionDelayFactorForSick": 3,
    "maxSickHeartbeatsBeforeShardQuarantine": 15,
    "minShardsOnlineBeforeSharding": 1
  },
  "distributor": {
    "roadmapMaxRetries": 0,
    "roadmapExpirationSec": 15,
    "delayMs": 5000,
    "startDelayMs": 10000,
    "reloadDutiesFromStorageEachPeriods": 10,
    "reloadDutiesFromStorage": false,
    "runConsistencyCheck": false
  },
  "balancer": {
    "spillOverMaxValue": 99999999999,
    "spillOverMaxUnit": "USE_CAPACITY",
    "evenLoadPresort": null,
    "roundRobinMaxDutiesDeltaBetweenShards": 0,
    "strategy": "EVEN_WEIGHT"
  },
  "follower": {
    "maxHeartbeatBuildFailsBeforeReleasing": 1,
    "heartbeatDelayMs": 2000,
    "heartbeatStartDelayMs": 1000,
    "heartattackCheckStartDelayMs": 10000,
    "heartattackCheckDelayMs": 10000,
    "clearanceCheckStartDelayMs": 20000,
    "clearanceCheckDelayMs": 10000,
    "clearanceMaxAbsenceMs": 30000,
    "maxHeartbeatAbsenceForReleaseMs": 10000
  },
  "broker": {
    "networkInterfase": "lo",
    "shardIdSuffix": "",
    "enablePortFallback": true,
    "retryDelayMs": 3000,
    "maxRetries": 300,
    "connectionHandlerThreads": 10,
    "hostPort": "192.168.1.102:9000"
  },
  "bootstrap": {
    "serviceName": "default-name",
    "webServerHostPort": "192.168.1.102:9100",
    "enableWebserver": true,
    "zookeeperHostPort": "localhost:2181",
    "leaderShardAlsoFollows": true,
    "publishLeaderCandidature": true,
    "readynessRetryDelayMs": 5000
  },
  "scheduler": {
    "semaphoreUnlockMaxRetries": 30,
    "semaphoreUnlockRetryDelayMs": 100,
    "maxConcurrency": 10
  }
}

Minka produces a very noisy logging that allows debugging, if we need anything of importance out of the webserver endpoints, we can tail the log files:

file about
leader.log the leader shard about the running phases results
tasks.log all shards about the scheduler and its frequent agents
broker.log the TCP broker input and output communication between shards
follower.log all the follower shards about their commands at the leader.