Overview

At a high level, the Collector core is a Jetty service intended to receive events and persist them to HDFS (Hadoop DFS). It provides additional services such as data validation as well as bucketing. The Collector is the core component to Ning's Analytics data pipeline.

The image below shows how we use it at Ning to gather data: each subsystem (either internal or external) that stores data into HDFS does it via the Collector APIs.

An event is a piece of information that occurred at a certain point in time. It may come from a user's browser, from another Java program, from a script that processes log files, ... In practice, it is a generic series of key-value pairs, defined by its schema. Any event sent to the collector core will show up in Hadoop, regardless if a schema has been defined. To document and maintain schema metadata, we use Goodwill at Ning.

Events are first spooled locally in a local directory (.diskpool by default) for aggregation, before being pushed to Hadoop


Getting started

Self contained jar are published to Maven Central. To start the Collector locally:

java -Dcollector.event-output-directory=/var/tmp/collector -jar metrics.collector-*-jar-with-dependencies.jar

By default, it uses the local Hadoop filesystem. Events will show up in /var/tmp/collector.

Note Available configuration options can be found here.


Client side events logging (e.g. from Javascript)

To instrument behaviors and interactions within a browser, the collector supports a GET endpoint. To send an event, issue a GET call on the collector /1 or /2 endpoints. The event collector receives events encoded in the path. It returns a 202 (accepted) to indicate that a best effort will be made to get the event into HDFS (barring catastrophic failures such as flaming machines). If the collector is overloaded, it can return a 503 (temporarily unavailable). For example:

curl -v http://127.0.0.1:8080/1?v=Hello,sWorld

will send a single Hello event, with a single value (the string World). Data is serialized in Thrift format on the way in.

Client-side API Version 1

The primary concern with the url is to keep it smaller than 256 chars to avoid truncation by some browsers.

/1?v=,,,...

Field Types

Collector type Equivalent thrift type Description Notes
bboolA boolean value (true or false)we use 0 or 1
1byteAn 8-bit signed integer
2i1616-bit signed integer
4i32A 32-bit signed integer
8i64A 64-bit signed integer
ddoubleA 64-bit floating point number
sstringA text string encoded using UTF-8 encoding
xn/aThe value indicates an annotation to fill in that the event collector can compute at receipt of the event

Custom Type functions

In order to save space in the url, we have a custom type of 'x' which the event collector can fill in. The valid values that may follow x include:

Annotation value Meaning Notes
dateCurrent time (millis since epoch)
hostReferrer host of the request to the Collector (parsed from Referrer header)
pathReferrer path of the request to the Collector (parsed from Referrer header)
uaUser-Agent header from request
ipIP address parsed from X-Forwarded-For header in requestRequires that you have a load balancer to add this header

Client-side API Version 2 - Base64 Encode Numbers

Version 2 of the Collector's API encodes numbers as in a custom base64 encoding. The URL format and types are the same as version 1:

/2?v=,,,...

Encoding schema

We map 0 - 63 to the following chars:

-
0-9
A-Z
_
a-z

So '-' is 0, 'z' is 63. Example:

/2?v=TestEvent,1-,20,4z,810

would result in an event:

type = EventType, byte: 0, short: 1, int: 63, long 128

Server side events logging (e.g. from Java, php)

On the server side, you should use the more efficient resource

POST /rest/1.0/event

which returns 202 if the payload was properly formed. It can also return 503 which means the collector is overloaded and the client should retry. Multiple formats are accepted: Smile, Json, Thrift.

Optionally you may add date= to the query parameters to set a date for the event other than when it is received by the Collector (in ISO8601 format)

POST /rest/1.0/event?name=EventType&date=2009-01-02T03:04:05.006Z

Json/Smile (recommended)

The collector accepts Json/Smile events:

{
    "eventName": ..., // Required: event name
    "payload":
    {
        "eventDate": ...,  // Optional, defaults to the timestamp when the event was deserialized on the collector
        "eventGranularity": ..., // Optional, defaults to HOURLY
        "fied1": ..., // Your stuff!
        "field2": ... // Your stuff!
    }
}

For instance:

echo '{ "eventName":"Hello", "payload": { "dontcare": "World" } }' > foo
curl -v -XPOST -H'Content-Type: application/json' -d@foo http://127.0.0.1:8080/rest/1.0/event

The request body can contain multiple events or an array of events:

{ "eventName":"Hello", "payload": { "dontcare": "World" } }
{ "eventName":"Hello", "payload": { "dontcare": "World" } }
{ "eventName":"Hello", "payload": { "dontcare": "World" } }

or

[{ "eventName":"Hello", "payload": { "dontcare": "World" } },
 { "eventName":"Hello", "payload": { "dontcare": "World" } },
 { "eventName":"Hello", "payload": { "dontcare": "World" } }]

Because the body can contains multiple events and a subset of these can be malformatted, the endpoint will always return 202 accepted. You can look for a 199 Warning header though for exceptions on failed events.

For json, use the application/json content-type. For Smile, use application/json+smile.

Raw Thrift via HTTP

You have to specify the event name in the request URL for Thrift:

POST /rest/1.0/event?name=EventType

Use the application/thrift content-type.

Thrift via Scribe

The Collector exposes a non-HTTP Thrift API: the Scribe API. You can log messages directly from Scribe or any Thrift application.

You can test the Thrift endpoint by using the test_collector_thrift_endpoint.py script provided in the Collector source code:

python src/test/py/test_collector_thrift_endpoint.py

Eventtracker Java library

To send event data from Java, you can use the eventtracker library. We use it at Ning to send application specific data, log4j logs, Jetty request logs, etc.

The eventtracker supports most Collector APIs. You can use:

Overview

Each event accepted via the offerEvent(Event) API is directly serialized to disk using the com.ning:metrics.serialization-writer library for persistency (except when using the GET API version 1). Events are then de-serialized and sent to the collector periodically and/or after a certain number of events have been offered.

The CollectorController class provides the commit() call to force a promotion from the temporary queue of events to the final queue: only events in the final queue are sent (a separate thread wakes up periodically to see if there is anything to send). The commit() call bypasses the promotion rules mentioned above.

One can force a flush to the final queue by calling flush() on the controller object. This forces all events in the queue to be sent remotely.

Serialization to disk is done using the Java Serialization mechanisms (ObjectOutputStream to file). The whole Event object is written to disk. This is not configurable.

Final serialization on the wire is configurable (e.g. Json, Smile, Thrift). You can extend your event to support your own serialization protocol by implementing getSerializedBytes(). These bytes are sent on the wire.

eventtracker-smile usage

To use com.ning:metrics.eventtracker-smile, update your pom.xml:

<dependency>
    <groupId>com.ning</groupId>
    <artifactId>metrics.eventtracker-smile</artifactId>
    <version>4.0.8</version>
</dependency>

You may also need to add com.google.inject.extensions:guice-multibindings (3.0 or higher) and org.skife.config:config-magic (0.9 or higher) as dependencies.

Install the Guice modules:

install(new MBeanModule());
install(new CollectorControllerSmileModule());
install(new CollectorControllerHttpMBeanModule());

Create your event:

// Your POJO
HelloEvent event = new HelloEvent(...);
collector.offerEvent(SmileEnvelopeEvent.fromPOJO("Hello", Granularity.HOURLY, event)); # The String is your event name
# On shutdown make sure to call collector.close() for cleanups

where


Monitoring

This section describes how to get stats out of a running Collector.

Via HTTP

HTTP is probably the easiest way to get simple stats out of a running collector:

~ > curl http://127.0.0.1:8080/1.0/healthcheck?pretty=true
    {
        "location" : "http://127.0.0.1:8080/1.0/healthcheck",
        "size" : 3,
        "formatVersion" : "1",
        "generated" : "2011-07-26T04:05:21.440Z",
        "items" : [ {
            "type" : "WriterHealthCheck",
            "code" : "OK",
            "message" : "running: true, local files: {/events/MyAwesomeEvent/2011/07/26/04|thrift: 0}, enqueued: 928839, written: 928839, dropped: 0, errored: 0, ignored: 0, flushes: 50212"
    }, {
        "type" : "RealtimeHealthCheck",
        "code" : "OK",
        "message" : "enabled: true, running: true, types: [MyAwesomeEvent, MyAwesomeEvent2], queue sizes: {MyAwesomeEvent: 0, MyAwesomeEvent2: 0}, enqueued: 76257, sent: 76257, dropped: 0, errored: 0, ignored: 852582"
    }, {
        "type" : "HadoopHealthCheck",
        "code" : "OK",
        "message" : "<your hadoop conf>"
    } ]
}

Via JMX

The 15 seconds test - is a collector falling over?

Load overview

To get a sense on how busy a collector is, look at the com.ning.metrics.collector.hadoop.processing:type=BufferingEventCollector,name=collectEvent bean. It gives you the rate of events being received, across all APIs (the collectEvent function dispatches to both ActiveMQ and HDFS).

The com.ning.metrics.collector:name=BufferingEventCollector bean has more details:

The com.ning.metrics.collector:name=WriterQueueStats beans gives write rates to HFDS:

Similar stats are available for ActiveMQ, under com.ning.metrics.collector:name=RTQueueStats (stats are further broken down per Event type).

Disabling flushes to ActiveMQ or HDFS

In case of an HDFS outage, the collector can buffer events locally via com.ning.metrics.collector:name=HDFSWriter (see enableFlush and disableFlush). The processLeftBelowFiles action can be used to force a flush (will process all local files, including quarantined ones).

A similar knob for ActiveMQ can be found under com.ning.metrics.collector:name=RTSubsystem.

Stats per API

Stats (acceptance rate and latency) per API are exposed under the com.ning.metrics.collector.endpoint.resources beans:

The beans under com.ning.metrics.collector.endpoint.resources:type=EventRequestHandler drill into stats (success and failure rates) of the GET API deserializer, per payload type: