Overview
Tyk streams configuration is specified using YAML. The configuration consists of several main sections: input, pipeline, output and optionally logger.Input
The input section defines the publisher source of the data stream. Tyk Streams supports various input types such as Kafka, HTTP, MQTT etc. Each input type has specific configuration parameters.Pipeline
The pipeline section defines the processing steps applied to the data. It includes processors for filtering, mapping, enriching and transforming the data. Processors can be chained together.Output
The output section specifies the destination of the processed data. Similar to inputs, Tyk Streams supports various output types like Kafka, HTTP etc.Logger (Optional)
The logger section is used to configure logging options, such as log level and output format.Inputs
Overview
An input is a source of data piped through an array of optional processors:Brokering
Only one input is configured at the root of a Tyk Streams config. However, the root input can be a broker which combines multiple inputs and merges the streams:Labels
Inputs have an optional fieldlabel
that can uniquely identify them in observability data such as logs.
Broker
Allows you to combine multiple inputs into a single stream of data, where each input will be read in parallel.Common
Advanced
Batching
It’s possible to configure a batch policy with a broker using thebatching
fields. When doing this the feeds from all child inputs are combined. Some inputs do not support broker based batching and specify this in their documentation.
Processors
It is possible to configure processors at the broker level, where they will be applied to all child inputs, as well as on the individual child inputs. If you have processors at both the broker level and on child inputs then the broker processors will be applied after the child nodes processors.Fields
copies
Whatever is specified withininputs
will be created this many times.
Type: int
Default: 1
inputs
A list of inputs to create. Type:array
batching
Allows you to configure a batching policy. Type:object
batching.count
A number of messages at which the batch should be flushed. If0
disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If0
disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size. Type:string
Default: ""
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. Type:array
Http Client
Connects to a server and continuously performs requests for a single message.Common
Advanced
Streaming
If you enable streaming then Tyk Streams will consume the body of the response as a continuous stream of data. This allows you to consume APIs that provide long lived streamed data feeds (such as Twitter).Pagination
This input supports interpolation functions in theurl
and headers
fields where data from the previous successfully consumed message (if there was one) can be referenced. This can be used in order to support basic levels of pagination.
Examples
Basic Pagination
Interpolation functions within theurl
and headers
fields can be used to reference the previously consumed message, which allows simple pagination.
Fields
url
The URL to connect to. Type:string
verb
A verb to connect with Type:string
Default: "GET"
headers
A map of headers to add to the request. Type:object
Default: {}
metadata
Specify optional matching rules to determine which metadata keys should be added to the HTTP request as headers. Type:object
metadata.include_prefixes
Provide a list of explicit metadata key prefixes to match against. Type:array
Default: []
metadata.include_patterns
Provide a list of explicit metadata key regular expression (re2) patterns to match against. Type:array
Default: []
dump_request_log_level
Optionally set a level at which the request and response payload of each request made will be logged. Type:string
Default: ""
Options: TRACE
, DEBUG
, INFO
, WARN
, ERROR
, FATAL
, “.
oauth
Allows you to specify open authentication via OAuth version 1. Type:object
oauth.enabled
Whether to use OAuth version 1 in requests. Type:bool
Default: false
oauth.consumer_key
A value used to identify the client to the service provider. Type:string
Default: ""
oauth.consumer_secret
A secret used to establish ownership of the consumer key. Type:string
Default: ""
oauth.access_token
A value used to gain access to the protected resources on behalf of the user. Type:string
Default: ""
oauth.access_token_secret
A secret provided in order to establish ownership of a given access token. Type:string
Default: ""
oauth2
Allows you to specify open authentication via OAuth version 2 using the client credentials token flow. Type:object
oauth2.enabled
Whether to use OAuth version 2 in requests. Type:bool
Default: false
oauth2.client_key
A value used to identify the client to the token provider. Type:string
Default: ""
oauth2.client_secret
A secret used to establish ownership of the client key. Type:string
Default: ""
oauth2.token_url
The URL of the token provider. Type:string
Default: ""
oauth2.scopes
A list of optional requested permissions. Type:array
Default: []
oauth2.endpoint_params
A list of optional endpoint parameters, values should be arrays of strings. Type:object
Default: {}
basic_auth
Allows you to specify basic authentication. Type:object
basic_auth.enabled
Whether to use basic authentication in requests. Type:bool
Default: false
basic_auth.username
A username to authenticate as. Type:string
Default: ""
basic_auth.password
A password to authenticate with. Type:string
Default: ""
jwt
Allows you to specify JWT authentication. Type:object
jwt.enabled
Whether to use JWT authentication in requests. Type:bool
Default: false
jwt.private_key_file
A file with the PEM encoded via PKCS1 or PKCS8 as private key. Type:string
Default: ""
jwt.signing_method
A method used to sign the token such as RS256, RS384, RS512 or EdDSA. Type:string
Default: ""
jwt.claims
A value used to identify the claims that issued the JWT. Type:object
Default: {}
jwt.headers
Add optional key/value headers to the JWT. Type:object
Default: {}
tls
Custom TLS settings can be used to override system defaults. Type:object
tls.enabled
Whether custom TLS settings are enabled. Type:bool
Default: false
tls.skip_cert_verify
Whether to skip server side certificate verification. Type:bool
Default: false
tls.enable_renegotiation
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error messagelocal error: tls: no renegotiation
.
Type: bool
Default: false
tls.root_cas
An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. Type:string
Default: ""
tls.root_cas_file
An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. Type:string
Default: ""
tls.client_certs
A list of client certificates to use. For each certificate either the fieldscert
and key
, or cert_file
and key_file
should be specified, but not both.
Type: array
Default: []
tls.client_certs[].cert
A plain text certificate to use. Type:string
Default: ""
tls.client_certs[].key
A plain text certificate key to use. Type:string
Default: ""
tls.client_certs[].cert_file
The path of a certificate to use. Type:string
Default: ""
tls.client_certs[].key_file
The path of a certificate key to use. Type:string
Default: ""
tls.client_certs[].password
A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsoletepbeWithMD5AndDES-CBC
algorithm is not supported for the PKCS#8 format. Warning: Since it does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.
Type: string
Default: ""
extract_headers
Specify which response headers should be added to resulting messages as metadata. Header keys are lowercased before matching, so ensure that your patterns target lowercased versions of the header keys that you expect. Type:object
extract_headers.include_prefixes
Provide a list of explicit metadata key prefixes to match against. Type:array
Default: []
extract_headers.include_patterns
Provide a list of explicit metadata key regular expression (re2) patterns to match against. Type:array
Default: []
timeout
A static timeout to apply to requests. Type:string
Default: "5s"
retry_period
The base period to wait between failed requests. Type:string
Default: "1s"
max_retry_backoff
The maximum period to wait between failed requests. Type:string
Default: "300s"
retries
The maximum number of retry attempts to make. Type:int
Default: 3
backoff_on
A list of status codes whereby the request should be considered to have failed and retries should be attempted, but the period between them should be increased gradually. Type:array
Default: [429]
drop_on
A list of status codes whereby the request should be considered to have failed but retries should not be attempted. This is useful for preventing wasted retries for requests that will never succeed. Note that with these status codes the request is dropped, but message that caused the request will not be dropped. Type:array
Default: []
successful_on
A list of status codes whereby the attempt should be considered successful, this is useful for dropping requests that return non-2XX codes indicating that the message has been dealt with, such as a 303 See Other or a 409 Conflict. All 2XX codes are considered successful unless they are present withinbackoff_on
or drop_on
, regardless of this field.
Type: array
Default: []
proxy_url
An optional HTTP proxy URL. Type:string
payload
An optional payload to deliver for each request. Type:string
drop_empty_bodies
Whether empty payloads received from the target server should be dropped. Type:bool
Default: true
stream
Allows you to set streaming mode, where requests are kept open and messages are processed line-by-line. Type:object
stream.enabled
Enables streaming mode. Type:bool
Default: false
stream.reconnect
Sets whether to re-establish the connection once it is lost. Type:bool
Default: true
auto_replay_nacks
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set tofalse
these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Type: bool
Default: true
HTTP Server
Receive messages POSTed over HTTP(S). HTTP 2.0 is supported when using TLS, which is enabled when key and cert files are specified.Common
Advanced
Responses
Endpoints
The following fields specify endpoints that are registered for sending messages, and support path parameters of the form/{foo}
, which are added to ingested messages as metadata. A path ending in /
will match against all extensions of that path:
path (defaults to /post
)
This endpoint expects POST requests where the entire request body is consumed as a single message.
If the request contains a multipart content-type
header as per rfc1341 then the multiple parts are consumed as a batch of messages, where each body part is a message of the batch.
ws_path (defaults to /post/ws
)
Creates a websocket connection, where payloads received on the socket are passed through the pipeline as a batch of one message.
Please note that components within a Tyk Streams config will register their respective endpoints in a non-deterministic order. This means that establishing precedence of endpoints that are registered via multiple http_server
inputs or outputs (either within brokers or from cohabiting streams) is not possible in a predictable way.
This ambiguity makes it difficult to ensure that paths which are both a subset of a path registered by a separate component, and end in a slash (/
) and will therefore match against all extensions of that path, do not prevent the more specific path from matching against requests.
It is therefore recommended that you ensure paths of separate components do not collide unless they are explicitly non-competing.
For example, if you were to deploy two separate http_server
inputs, one with a path /foo/
and the other with a path /foo/bar
, it would not be possible to ensure that the path /foo/
does not swallow requests made to /foo/bar
.
You may specify an optional ws_welcome_message
, which is a static payload to be sent to all clients once a websocket connection is first established.
Metadata
This input adds the following metadata fields to each message:Examples
Path Switching
This example shows anhttp_server
input that captures all requests and processes them by switching on that path:
Mock OAuth 2.0 Server
This example shows anhttp_server
input that mocks an OAuth 2.0 Client Credentials flow server at the endpoint /oauth2_test
:
Fields
address
An alternative address to host from. If left empty the service wide address is used. Type:string
Default: ""
path
The endpoint path to listen for POST requests. Type:string
Default: "/post"
ws_path
The endpoint path to create websocket connections from. Type:string
Default: "/post/ws"
ws_welcome_message
An optional message to deliver to fresh websocket connections. Type:string
Default: ""
allowed_verbs
An array of verbs that are allowed for thepath
endpoint.
Type: array
Default: ["POST"]
Requires version 3.33.0 or newer
timeout
Timeout for requests. If a consumed messages takes longer than this to be delivered the connection is closed, but the message may still be delivered. Type:string
Default: "5s"
Type: string
Default: ""
cert_file
Enable TLS by specifying a certificate and key file. Only valid with a customaddress
.
Type: string
Default: ""
key_file
Enable TLS by specifying a certificate and key file. Only valid with a customaddress
.
Type: string
Default: ""
cors
Adds Cross-Origin Resource Sharing headers. Only valid with a customaddress
.
Type: object
Requires version 3.63.0 or newer
cors.enabled
Whether to allow CORS requests. Type:bool
Default: false
cors.allowed_origins
An explicit list of origins that are allowed for CORS requests. Type:array
Default: []
sync_response
Customize messages returned via synchronous responses. Type:object
sync_response.status
Specify the status code to return with synchronous responses. This is a string value, which allows you to customize it based on resulting payloads and their metadata. Type:string
Default: "200"
sync_response.headers
Specify headers to return with synchronous responses. Type:object
Default: {"Content-Type":"application/octet-stream"}
sync_response.metadata_headers
Specify criteria for which metadata values are added to the response as headers. Type:object
sync_response.metadata_headers.include_prefixes
Provide a list of explicit metadata key prefixes to match against. Type:array
Default: []
sync_response.metadata_headers.include_patterns
Provide a list of explicit metadata key regular expression (re2) patterns to match against. Type:array
Default: []
Kafka
Connects to Kafka brokers and consumes one or more topics.Common
Advanced
1
and this will force partitions to be processed in lock-step, where a message will only be processed once the prior message is delivered.
Batching messages before processing can be enabled using the batching field, and this batching is performed per-partition such that messages of a batch will always originate from the same partition. This batching mechanism is capable of creating batches of greater size than the checkpoint_limit, in which case the next batch will only be created upon delivery of the current one.
Metadata
This input adds the following metadata fields to each message:kafka_lag
is the calculated difference between the high water mark offset of the partition at the time of ingestion and the current message offset.
Ordering
By default messages of a topic partition can be processed in parallel, up to a limit determined by the fieldcheckpoint_limit
. However, if strict ordered processing is required then this value must be set to 1 in order to process shard messages in lock-step. When doing so it is recommended that you perform batching at this component for performance as it will not be possible to batch lock-stepped messages at the output level.
Troubleshooting
- I’m seeing logs that report
Failed to connect to kafka: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
, but the brokers are definitely reachable.
Fields
addresses
A list of broker addresses to connect to. If an item of the list contains commas it will be expanded into multiple addresses. Type:array
topics
A list of topics to consume from. Multiple comma separated topics can be listed in a single element. Partitions are automatically distributed across consumers of a topic. Alternatively, it’s possible to specify explicit partitions to consume from with a colon after the topic name, e.g.foo:0
would consume the partition 0 of the topic foo. This syntax supports ranges, e.g. foo:0-10
would consume partitions 0 through to 10 inclusive.
Type: array
Requires version 3.33.0 or newer
target_version
The version of the Kafka protocol to use. This limits the capabilities used by the client and should ideally match the version of your brokers. Defaults to the oldest supported stable version. Type:string
tls
Custom TLS settings can be used to override system defaults. Type:object
tls.enabled
Whether custom TLS settings are enabled. Type:bool
Default: false
tls.skip_cert_verify
Whether to skip server side certificate verification. Type:bool
Default: false
tls.enable_renegotiation
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error messagelocal error: tls: no renegotiation
.
Type: bool
Default: false
Requires version 3.45.0 or newer
tls.root_cas
An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. Type:string
Default: ""
tls.root_cas_file
An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. Type:string
Default: ""
tls.client_certs
A list of client certificates to use. For each certificate either the fieldscert
and key
, or cert_file
and key_file
should be specified, but not both.
Type: array
Default: []
tls.client_certs[].cert
A plain text certificate to use. Type:string
Default: ""
tls.client_certs[].key
A plain text certificate key to use. Type:string
Default: ""
tls.client_certs[].cert_file
The path of a certificate to use. Type:string
Default: ""
tls.client_certs[].key_file
The path of a certificate key to use. Type:string
Default: ""
tls.client_certs[].password
A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsoletepbeWithMD5AndDES-CBC
algorithm is not supported for the PKCS#8 format. Warning: Since it does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.
Type: string
Default: ""
sasl
Enables SASL authentication. Type:object
sasl.mechanism
The SASL authentication mechanism, if left empty SASL authentication is not used. Type:string
Default: "none"
Option | Summary |
---|---|
OAUTHBEARER | OAuth Bearer based authentication. |
PLAIN | Plain text authentication. NOTE: When using plain text auth it is extremely likely that you’ll also need to enable TLS. |
SCRAM-SHA-256 | Authentication using the SCRAM-SHA-256 mechanism. |
SCRAM-SHA-512 | Authentication using the SCRAM-SHA-512 mechanism. |
none | Default, no SASL authentication. |
sasl.user
A PLAIN username. It is recommended that you use environment variables to populate this field. Type:string
Default: ""
sasl.password
A PLAIN password. It is recommended that you use environment variables to populate this field. Type:string
Default: ""
sasl.access_token
A static OAUTHBEARER access token Type:string
Default: ""
Type: string
Default: ""
sasl.token_key
Required when using atoken_cache
, the key to query the cache with for tokens.
Type: string
Default: ""
consumer_group
An identifier for the consumer group of the connection. This field can be explicitly made empty in order to disable stored offsets for the consumed topic partitions. Type:string
Default: ""
client_id
An identifier for the client connection. Type:string
Default: "tyk"
rack_id
A rack identifier for this client. Type:string
Default: ""
start_from_oldest
Determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset. The setting is applied when creating a new consumer group or the saved offset no longer exists. Type:bool
Default: true
checkpoint_limit
The maximum number of messages of the same topic and partition that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level to work on individual partitions. Any given offset will not be committed unless all messages under that offset are delivered in order to preserve at least once delivery guarantees. Type:int
Default: 1024
Requires version 3.33.0 or newer
auto_replay_nacks
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set tofalse
these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Type: bool
Default: true
commit_period
The period of time between each commit of the current partition offsets. Offsets are always committed during shutdown. Type:string
Default: "1s"
max_processing_period
A maximum estimate for the time taken to process a message, this is used for tuning consumer group synchronization. Type:string
Default: "100ms"
group
Tuning parameters for consumer group synchronization. Type:object
group.session_timeout
A period after which a consumer of the group is kicked after no heartbeats. Type:string
Default: "10s"
group.heartbeat_interval
A period in which heartbeats should be sent out. Type:string
Default: "3s"
group.rebalance_timeout
A period after which rebalancing is abandoned if unresolved. Type:string
Default: "60s"
fetch_buffer_cap
The maximum number of unprocessed messages to fetch at a given time. Type:int
Default: 256
multi_header
Decode headers into lists to allow handling of multiple values with the same key Type:bool
Default: false
batching
Allows you to configure a batching policy. Type:object
batching.count
A number of messages at which the batch should be flushed. If0
disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If0
disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size. Type:string
Default: ""
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. Type:array
Outputs
Overview
An output is a sink where we wish to send our consumed data after applying an optional array of processors. Only one output is configured at the root of a Tyk Streams config. However, the output can be a broker which combines multiple outputs under a chosen brokering pattern. An output config section looks like this:Labels
Outputs have an optional fieldlabel
that can uniquely identify them in observability data such as logs.
Broker
Allows you to route messages to multiple child outputs using a range of brokering patterns.Common
Advanced
Fields
copies
The number of copies of each configured output to spawn. Type:int
Default: 1
pattern
The brokering pattern to use. Type:string
Default: "fan_out"
Options: fan_out
, fan_out_fail_fast
, fan_out_sequential
, fan_out_sequential_fail_fast
, round_robin
, greedy
.
outputs
A list of child outputs to broker. Type:array
batching
Allows you to configure a batching policy. Type:object
batching.count
A number of messages at which the batch should be flushed. If0
disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If0
disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size. Type:string
Default: ""
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. Type:array
Patterns
The broker pattern determines the way in which messages are allocated and can be chosen from the following:fan_out
With the fan out pattern all outputs will be sent every message that passes through Tyk Streams in parallel. If an output applies back pressure it will block all subsequent messages, and if an output fails to send a message it will be retried continuously until completion or service shut down. This mechanism is in place in order to prevent one bad output from causing a larger retry loop that results in a good output from receiving unbounded message duplicates.fan_out_fail_fast
The same as thefan_out
pattern, except that output failures will not be automatically retried. This pattern should be used with caution as busy retry loops could result in unlimited duplicates being introduced into the non-failure outputs.
fan_out_sequential
Similar to the fan out pattern except outputs are written to sequentially, meaning an output is only written to once the preceding output has confirmed receipt of the same message. If an output applies back pressure it will block all subsequent messages, and if an output fails to send a message it will be retried continuously until completion or service shut down. This mechanism is in place in order to prevent one bad output from causing a larger retry loop that results in a good output from receiving unbounded message duplicates.fan_out_sequential_fail_fast
The same as thefan_out_sequential
pattern, except that output failures will not be automatically retried. This pattern should be used with caution as busy retry loops could result in unlimited duplicates being introduced into the non-failure outputs.
round_robin
With the round robin pattern each message will be assigned a single output following their order. If an output applies back pressure it will block all subsequent messages. If an output fails to send a message then the message will be re-attempted with the next input, and so on.greedy
The greedy pattern results in higher output throughput at the cost of potentially disproportionate message allocations to those outputs. Each message is sent to a single output, which is determined by allowing outputs to claim messages as soon as they are able to process them. This results in certain faster outputs potentially processing more messages at the cost of slower outputs.HTTP Client
Sends messages to an HTTP server.Common
Advanced
false
.
Propagating Responses
It’s possible to propagate the response from each HTTP request back to the input source by settingpropagate_response
to true
. Only inputs that support synchronous responses are able to make use of these propagated responses.
Performance
This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the fieldmax_in_flight
.
This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level.
Fields
url
The URL to connect to. Type:string
verb
A verb to connect with Type:string
Default: "POST"
headers
A map of headers to add to the request. Type:object
Default: {}
metadata
Specify optional matching rules to determine which metadata keys should be added to the HTTP request as headers. Type:object
metadata.include_prefixes
Provide a list of explicit metadata key prefixes to match against. Type:array
Default: []
metadata.include_patterns
Provide a list of explicit metadata key regular expression (re2) patterns to match against. Type:array
Default: []
dump_request_log_level
Optionally set a level at which the request and response payload of each request made will be logged. Type:string
Default: ""
Options: TRACE
, DEBUG
, INFO
, WARN
, ERROR
, FATAL
, “.
oauth
Allows you to specify open authentication via OAuth version 1. Type:object
oauth.enabled
Whether to use OAuth version 1 in requests. Type:bool
Default: false
oauth.consumer_key
A value used to identify the client to the service provider. Type:string
Default: ""
oauth.consumer_secret
A secret used to establish ownership of the consumer key. Type:string
Default: ""
oauth.access_token
A value used to gain access to the protected resources on behalf of the user. Type:string
Default: ""
oauth.access_token_secret
A secret provided in order to establish ownership of a given access token. Type:string
Default: ""
oauth2
Allows you to specify open authentication via OAuth version 2 using the client credentials token flow. Type:object
oauth2.enabled
Whether to use OAuth version 2 in requests. Type:bool
Default: false
oauth2.client_key
A value used to identify the client to the token provider. Type:string
Default: ""
oauth2.client_secret
A secret used to establish ownership of the client key. Type:string
Default: ""
oauth2.token_url
The URL of the token provider. Type:string
Default: ""
oauth2.scopes
A list of optional requested permissions. Type:array
Default: []
oauth2.endpoint_params
A list of optional endpoint parameters, values should be arrays of strings. Type:object
Default: {}
basic_auth
Allows you to specify basic authentication. Type:object
basic_auth.enabled
Whether to use basic authentication in requests. Type:bool
Default: false
basic_auth.username
A username to authenticate as. Type:string
Default: ""
basic_auth.password
A password to authenticate with. Type:string
Default: ""
jwt
Allows you to specify JWT authentication. Type:object
jwt.enabled
Whether to use JWT authentication in requests. Type:bool
Default: false
jwt.private_key_file
A file with the PEM encoded via PKCS1 or PKCS8 as private key. Type:string
Default: ""
jwt.signing_method
A method used to sign the token such as RS256, RS384, RS512 or EdDSA. Type:string
Default: ""
jwt.claims
A value used to identify the claims that issued the JWT. Type:object
Default: {}
jwt.headers
Add optional key/value headers to the JWT. Type:object
Default: {}
tls
Custom TLS settings can be used to override system defaults. Type:object
tls.enabled
Whether custom TLS settings are enabled. Type:bool
Default: false
tls.skip_cert_verify
Whether to skip server side certificate verification. Type:bool
Default: false
tls.enable_renegotiation
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error messagelocal error: tls: no renegotiation
.
Type: bool
Default: false
tls.root_cas
An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. Type:string
Default: ""
tls.root_cas_file
An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. Type:string
Default: ""
tls.client_certs
A list of client certificates to use. For each certificate either the fieldscert
and key
, or cert_file
and key_file
should be specified, but not both.
Type: array
Default: []
tls.client_certs[].cert
A plain text certificate to use. Type:string
Default: ""
tls.client_certs[].key
A plain text certificate key to use. Type:string
Default: ""
tls.client_certs[].cert_file
The path of a certificate to use. Type:string
Default: ""
tls.client_certs[].key_file
The path of a certificate key to use. Type:string
Default: ""
tls.client_certs[].password
A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsoletepbeWithMD5AndDES-CBC
algorithm is not supported for the PKCS#8 format. Warning: Since it does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.
Type: string
Default: ""
extract_headers
Specify which response headers should be added to resulting synchronous response messages as metadata. Header keys are lowercased before matching, so ensure that your patterns target lowercased versions of the header keys that you expect. This field is not applicable unlesspropagate_response
is set to true
.
Type: object
extract_headers.include_prefixes
Provide a list of explicit metadata key prefixes to match against. Type:array
Default: []
extract_headers.include_patterns
Provide a list of explicit metadata key regular expression (re2) patterns to match against. Type:array
Default: []
timeout
A static timeout to apply to requests. Type:string
Default: "5s"
retry_period
The base period to wait between failed requests. Type:string
Default: "1s"
max_retry_backoff
The maximum period to wait between failed requests. Type:string
Default: "300s"
retries
The maximum number of retry attempts to make. Type:int
Default: 3
backoff_on
A list of status codes whereby the request should be considered to have failed and retries should be attempted, but the period between them should be increased gradually. Type:array
Default: [429]
drop_on
A list of status codes whereby the request should be considered to have failed but retries should not be attempted. This is useful for preventing wasted retries for requests that will never succeed. Note that with these status codes the request is dropped, but message that caused the request will not be dropped. Type:array
Default: []
successful_on
A list of status codes whereby the attempt should be considered successful, this is useful for dropping requests that return non-2XX codes indicating that the message has been dealt with, such as a 303 See Other or a 409 Conflict. All 2XX codes are considered successful unless they are present withinbackoff_on
or drop_on
, regardless of this field.
Type: array
Default: []
proxy_url
An optional HTTP proxy URL. Type:string
batch_as_multipart
Send message batches as a single request using RFC1341. If disabled messages in batches will be sent as individual requests. Type:bool
Default: false
propagate_response
Whether responses from the server should be propagated back to the input. Type:bool
Default: false
max_in_flight
The maximum number of parallel message batches to have in flight at any given time. Type:int
Default: 64
batching
Allows you to configure a batching policy. Type:object
batching.count
A number of messages at which the batch should be flushed. If0
disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If0
disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size. Type:string
Default: ""
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. Type:array
multipart
Create explicit multipart HTTP requests by specifying an array of parts to add to the request, each part specified consists of content headers and a data field that can be populated dynamically. If this field is populated it will override the default request creation behavior. Type:array
Default: []
multipart[].content_type
The content type of the individual message part. Type:string
Default: ""
multipart[].content_disposition
The content disposition of the individual message part. Type:string
Default: ""
multipart[].body
The body of the individual message part. Type:string
Default: ""
HTTP Server
Sets up an HTTP server that will send messages over HTTP(S) GET requests. HTTP 2.0 is supported when using TLS, which is enabled when key and cert files are specified.Common
Advanced
path
, stream_path
and ws_path
. Which allow you to consume a single message batch, a continuous stream of line delimited messages, or a websocket of messages for each request respectively.
When messages are batched the path
endpoint encodes the batch according to RFC1341.
Please note, messages are considered delivered as soon as the data is written to the client. There is no concept of at least once delivery on this output.
Please note that components within a Tyk config will register their respective endpoints in a non-deterministic order. This means that establishing precedence of endpoints that are registered via multiple http_server
inputs or outputs (either within brokers or from cohabiting streams) is not possible in a predictable way.
This ambiguity makes it difficult to ensure that paths which are both a subset of a path registered by a separate component, and end in a slash (/
) and will therefore match against all extensions of that path, do not prevent the more specific path from matching against requests.
It is therefore recommended that you ensure paths of separate components do not collide unless they are explicitly non-competing.
For example, if you were to deploy two separate http_server
inputs, one with a path /foo/
and the other with a path /foo/bar
, it would not be possible to ensure that the path /foo/
does not swallow requests made to /foo/bar
.
Fields
address
An alternative address to host from. If left empty the service wide address is used. Type:string
Default: ""
path
The path from which discrete messages can be consumed. Type:string
Default: "/get"
stream_path
The path from which a continuous stream of messages can be consumed. Type:string
Default: "/get/stream"
ws_path
The path from which websocket connections can be established. Type:string
Default: "/get/ws"
allowed_verbs
An array of verbs that are allowed for thepath
and stream_path
HTTP endpoint.
Type: array
Default: ["GET"]
timeout
The maximum time to wait before a blocking, inactive connection is dropped (only applies to thepath
endpoint).
Type: string
Default: "5s"
cert_file
Enable TLS by specifying a certificate and key file. Only valid with a customaddress
.
Type: string
Default: ""
key_file
Enable TLS by specifying a certificate and key file. Only valid with a customaddress
.
Type: string
Default: ""
cors
Adds Cross-Origin Resource Sharing headers. Only valid with a customaddress
.
Type: object
cors.enabled
Whether to allow CORS requests. Type:bool
Default: false
cors.allowed_origins
An explicit list of origins that are allowed for CORS requests. Type:array
Default: []
Kafka
The kafka output type writes a batch of messages to Kafka brokers and waits for acknowledgment before propagating it back to the input.Common
Advanced
ack_replicas
determines whether we wait for acknowledgment from all replicas or just a single broker.
Metadata will be added to each message sent as headers (version 0.11+), but can be restricted using the field metadata.
Strict Ordering and Retries
When strict ordering is required for messages written to topic partitions it is important to ensure that both the fieldmax_in_flight
is set to 1
and that the field retry_as_batch
is set to true
.
You must also ensure that failed batches are never rerouted back to the same output. This can be done by setting the field max_retries
to 0
and backoff.max_elapsed_time
to empty, which will apply back pressure indefinitely until the batch is sent successfully.
However, this also means that manual intervention will eventually be required in cases where the batch cannot be sent due to configuration problems such as an incorrect max_msg_bytes
estimate. A less strict but automated alternative would be to route failed batches to a dead letter queue using a fallback
broker, but this would allow subsequent batches to be delivered in the meantime whilst those failed batches are dealt with.
Troubleshooting
- I’m seeing logs that report
Failed to connect to kafka: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
, but the brokers are definitely reachable.
Performance
This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the fieldmax_in_flight
.
This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level.
Fields
addresses
A list of broker addresses to connect to. If an item of the list contains commas it will be expanded into multiple addresses. Type:array
tls
Custom TLS settings can be used to override system defaults. Type:object
tls.enabled
Whether custom TLS settings are enabled. Type:bool
Default: false
tls.skip_cert_verify
Whether to skip server side certificate verification. Type:bool
Default: false
tls.enable_renegotiation
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error messagelocal error: tls: no renegotiation
.
Type: bool
Default: false
tls.root_cas
An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. Type:string
Default: ""
tls.root_cas_file
An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. Type:string
Default: ""
tls.client_certs
A list of client certificates to use. For each certificate either the fieldscert
and key
, or cert_file
and key_file
should be specified, but not both.
Type: array
Default: []
tls.client_certs[].cert
A plain text certificate to use. Type:string
Default: ""
tls.client_certs[].key
A plain text certificate key to use. Type:string
Default: ""
tls.client_certs[].cert_file
The path of a certificate to use. Type:string
Default: ""
tls.client_certs[].key_file
The path of a certificate key to use. Type:string
Default: ""
tls.client_certs[].password
A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsoletepbeWithMD5AndDES-CBC
algorithm is not supported for the PKCS#8 format. Warning: Since it does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.
Type: string
Default: ""
sasl
Enables SASL authentication. Type:object
sasl.mechanism
The SASL authentication mechanism, if left empty SASL authentication is not used. Type:string
Default: "none"
Option | Summary |
---|---|
OAUTHBEARER | OAuth Bearer based authentication. |
PLAIN | Plain text authentication. NOTE: When using plain text auth it is extremely likely that you’ll also need to enable TLS. |
SCRAM-SHA-256 | Authentication using the SCRAM-SHA-256 mechanism. |
SCRAM-SHA-512 | Authentication using the SCRAM-SHA-512 mechanism. |
none | Default, no SASL authentication. |
sasl.user
A PLAIN username. It is recommended that you use environment variables to populate this field. Type:string
Default: ""
sasl.password
A PLAIN password. It is recommended that you use environment variables to populate this field. Type:string
Default: ""
sasl.access_token
A static OAUTHBEARER access token Type:string
Default: ""
sasl.token_cache
Instead of using a staticaccess_token
allows you to query a cache
resource to fetch OAUTHBEARER tokens from
Type: string
Default: ""
sasl.token_key
Required when using atoken_cache
, the key to query the cache with for tokens.
Type: string
Default: ""
topic
The topic to publish messages to. Type:string
client_id
An identifier for the client connection. Type:string
Default: "tyk"
target_version
The version of the Kafka protocol to use. This limits the capabilities used by the client and should ideally match the version of your brokers. Defaults to the oldest supported stable version. Type:string
rack_id
A rack identifier for this client. Type:string
Default: ""
key
The key to publish messages with. Type:string
Default: ""
partitioner
The partitioning algorithm to use. Type:string
Default: "fnv1a_hash"
Options: fnv1a_hash
, murmur2_hash
, random
, round_robin
, manual
.
partition
The manually-specified partition to publish messages to, relevant only when the fieldpartitioner
is set to manual
. Must be able to parse as a 32-bit integer.
Type: string
Default: ""
custom_topic_creation
If enabled, topics will be created with the specified number of partitions and replication factor if they do not already exist. Type:object
custom_topic_creation.enabled
Whether to enable custom topic creation. Type:bool
Default: false
custom_topic_creation.partitions
The number of partitions to create for new topics. Leave at -1 to use the broker configured default. Mustbe >= 1
.
Type: int
Default: -1
custom_topic_creation.replication_factor
The replication factor to use for new topics. Leave at -1 to use the broker configured default. Must be an odd number, and less then or equal to the number of brokers. Type:int
Default: -1
compression
The compression algorithm to use. Type:string
Default: "none"
Options: none
, snappy
, lz4
, gzip
, zstd
.
static_headers
An optional map of static headers that should be added to messages in addition to metadata. Type:object
metadata
Specify criteria for which metadata values are sent with messages as headers. Type:object
metadata.exclude_prefixes
Provide a list of explicit metadata key prefixes to be excluded when adding metadata to sent messages. Type:array
Default: []
max_in_flight
The maximum number of messages to have in flight at a given time. Increase this to improve throughput. Type:int
Default: 64
idempotent_write
Enable the idempotent write producer option. This requires theIDEMPOTENT_WRITE
permission on CLUSTER
and can be disabled if this permission is not available.
Type: bool
Default: false
ack_replicas
Ensure that messages have been copied across all replicas before acknowledging receipt. Type:bool
Default: false
max_msg_bytes
The maximum size in bytes of messages sent to the target topic. Type:int
Default: 1000000
timeout
The maximum period of time to wait for message sends before abandoning the request and retrying. Type:string
Default: "5s"
retry_as_batch
When enabled forces an entire batch of messages to be retried if any individual message fails on a send, otherwise only the individual messages that failed are retried. Disabling this helps to reduce message duplicates during intermittent errors, but also makes it impossible to guarantee strict ordering of messages. Type:bool
Default: false
batching
Allows you to configure a batching policy. Type:object
batching.count
A number of messages at which the batch should be flushed. If0
disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If0
disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size. Type:string
Default: ""
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. Type:array
max_retries
The maximum number of retries before giving up on the request. If set to zero there is no discrete limit. Type:int
Default: 0
backoff
Control time intervals between retry attempts. Type:object
backoff.initial_interval
The initial period to wait between retry attempts. Type:string
Default: "3s"
backoff.max_interval
The maximum period to wait between retry attempts Type:string
Default: "10s"
backoff.max_elapsed_time
The maximum overall period of time to spend on retry attempts before the request is aborted. Setting this value to a zeroed duration (such as0s
) will result in unbounded retries.
Type: string
Default: "30s"
Processors
Overview
Tyk Streams processors are functions applied to messages passing through a pipeline. Processors are set via config, and depending on where in the config they are placed they will be run either immediately after a specific input (set in the input section), on all messages (set in the pipeline section) or before a specific output (set in the output section). Most processors apply to all messages and can be placed in the pipeline section:threads
field in the pipeline section determines how many parallel processing threads are created. You can read more about parallel processing in the pipeline guide.
Labels
Processors have an optional fieldlabel
that can uniquely identify them in observability data such as logs.
Avro
NoteIf you are consuming or generating messages using a schema registry service then it is likely this processor will fail as those services require messages to be prefixed with the identifier of the schema version being used.
Operators
to_json
Converts Avro documents into a JSON structure. This makes it easier to manipulate the contents of the document within Tyk Streams. The encoding field specifies how the source documents are encoded.from_json
Attempts to convert JSON documents into Avro documents according to the specified encoding.Fields
operator
The operator to execute Type:string
Options: to_json
, from_json
.
encoding
An Avro encoding format to use for conversions to and from a schema. Type:string
Default: "textual"
Options: textual
, binary
, single
.
schema
A full Avro schema to use. Type:string
Default: ""
schema_path
The path of a schema document to apply. Use either this or theschema
field.
Type: string
Default: ""
Tracers
Overview
A tracer type represents a destination for Tyk Streams to send tracing events to such as Jaeger. When a tracer is configured all messages will be allocated a root span during ingestion that represents their journey through a Streams pipeline. Many Streams processors create spans, and so tracing is a great way to analyse the pathways of individual messages as they progress through a Streams instance. Some inputs, such ashttp_server
and http_client
, are capable of extracting a root span from the source of the message (HTTP headers). This is
a work in progress and should eventually expand so that all inputs have a way of doing so.
Other inputs, such as kafka
can be configured to extract a root span by using the extract_tracing_map
field.
A tracer config section looks like this:
NoteAlthough the configuration spec of this component is stable the format of spans, tags and logs created by Streams is subject to change as it is tuned for improvement.
Jaeger
Advanced
Fields
agent_address
The address of a Jaeger agent to send tracing events to. Type:string
Default: ""
collector_url
The URL of a Jaeger collector to send tracing events to. If set, this will overrideagent_address
.
Type: string
Default: ""
sampler_type
The sampler type to use. Type:string
Default: "const"
Option | Summary |
---|---|
const | Sample a percentage of traces. 1 or more means all traces are sampled, 0 means no traces are sampled and anything in between means a percentage of traces are sampled. Tuning the sampling rate is recommended for high-volume production workloads. |
sampler_param
A parameter to use for sampling. This field is unused for some sampling types. Type:float
Default: 1
tags
A map of tags to add to tracing spans. Type:object
Default: {}
flush_interval
The period of time between each flush of tracing spans. Type:string
OpenTelemetry Collector
Advanced
NoteThis component is experimental and therefore subject to change or removal outside of major version releases.
Fields
http
A list of http collectors. Type:array
http[].address
The endpoint of a collector to send tracing events to. Type:string
http[].secure
Connect to the collector over HTTPS Type:bool
Default: false
grpc
A list of grpc collectors. Type:array
grpc[].address
The endpoint of a collector to send tracing events to. Type:string
grpc[].secure
Connect to the collector with client transport security Type:bool
Default: false
tags
A map of tags to add to all tracing spans. Type:object
Default: {}
sampling
Settings for trace sampling. Sampling is recommended for high-volume production workloads. Type:object
sampling.enabled
Whether to enable sampling. Type:bool
Default: false
sampling.ratio
Sets the ratio of traces to sample. Type:float
Metrics
Overview
Streams emits lots of metrics in order to expose how components configured within your pipeline are behaving. You can configure exactly where these metrics end up with the config fieldmetrics
, which describes a metrics format and destination. For example, if you wished to push them via the Prometheus protocol you could use this configuration:
Metric Names
Metrics are emitted with a prefix that can be configured with the fieldprefix
. The default prefix is bento
. The following metrics are emitted with the respective types:
Gauges
{prefix}_input_count
Number of inputs currently active.{prefix}_output_count
Number of outputs currently active.{prefix}_processor_count
Number of processors currently active.{prefix}_cache_count
Number of caches currently active.{prefix}_condition_count
Number of conditions currently active.{prefix}_input_connection_up
1 if a particular input is connected, 0 if it is not.{prefix}_output_connection_up
1 if a particular output is connected, 0 if it is not.{prefix}_input_running
1 if a particular input is running, 0 if it is not.{prefix}_output_running
1 if a particular output is running, 0 if it is not.{prefix}_processor_running
1 if a particular processor is running, 0 if it is not.{prefix}_cache_running
1 if a particular cache is running, 0 if it is not.{prefix}_condition_running
1 if a particular condition is running, 0 if it is not.{prefix}_buffer_running
1 if a particular buffer is running, 0 if it is not.{prefix}_buffer_available
The number of messages that can be read from a buffer.{prefix}_input_retry
The number of active retry attempts for a particular input.{prefix}_output_retry
The number of active retry attempts for a particular output.{prefix}_processor_retry
The number of active retry attempts for a particular processor.{prefix}_cache_retry
The number of active retry attempts for a particular cache.{prefix}_condition_retry
The number of active retry attempts for a particular condition.{prefix}_buffer_retry
The number of active retry attempts for a particular buffer.{prefix}_threads_active
The number of processing threads currently active.
Counters
{prefix}_input_received
Count of messages received by a particular input.{prefix}_input_batch_received
Count of batches received by a particular input.{prefix}_output_sent
Count of messages sent by a particular output.{prefix}_output_batch_sent
Count of batches sent by a particular output.{prefix}_processor_processed
Count of messages processed by a particular processor.{prefix}_processor_batch_processed
Count of batches processed by a particular processor.{prefix}_processor_dropped
Count of messages dropped by a particular processor.{prefix}_processor_batch_dropped
Count of batches dropped by a particular processor.{prefix}_processor_error
Count of errors returned by a particular processor.{prefix}_processor_batch_error
Count of batch errors returned by a particular processor.{prefix}_cache_hit
Count of cache key lookups that found a value.{prefix}_cache_miss
Count of cache key lookups that did not find a value.{prefix}_cache_added
Count of new cache entries.{prefix}_cache_err
Count of errors that occurred during a cache operation.{prefix}_condition_hit
Count of condition checks that passed.{prefix}_condition_miss
Count of condition checks that failed.{prefix}_condition_error
Count of errors that occurred during a condition check.{prefix}_buffer_added
Count of messages added to a particular buffer.{prefix}_buffer_batch_added
Count of batches added to a particular buffer.{prefix}_buffer_read
Count of messages read from a particular buffer.{prefix}_buffer_batch_read
Count of batches read from a particular buffer.{prefix}_buffer_ack
Count of messages removed from a particular buffer.{prefix}_buffer_batch_ack
Count of batches removed from a particular buffer.{prefix}_buffer_nack
Count of messages that failed to be removed from a particular buffer.{prefix}_buffer_batch_nack
Count of batches that failed to be removed from a particular buffer.{prefix}_buffer_err
Count of errors that occurred during a buffer operation.{prefix}_buffer_batch_err
Count of batch errors that occurred during a buffer operation.{prefix}_input_error
Count of errors that occurred during an input operation.{prefix}_input_batch_error
Count of batch errors that occurred during an input operation.{prefix}_output_error
Count of errors that occurred during an output operation.{prefix}_output_batch_error
Count of batch errors that occurred during an output operation.{prefix}_resource_cache_error
Count of errors that occurred during a resource cache operation.{prefix}_resource_condition_error
Count of errors that occurred during a resource condition operation.{prefix}_resource_input_error
Count of errors that occurred during a resource input operation.{prefix}_resource_processor_error
Count of errors that occurred during a resource processor operation.{prefix}_resource_output_error
Count of errors that occurred during a resource output operation.{prefix}_resource_rate_limit_error
Count of errors that occurred during a resource rate limit operation.
Timers
{prefix}_input_latency
Latency of a particular input.{prefix}_input_batch_latency
Latency of a particular input at the batch level.{prefix}_output_latency
Latency of a particular output.{prefix}_output_batch_latency
Latency of a particular output at the batch level.{prefix}_processor_latency
Latency of a particular processor.{prefix}_processor_batch_latency
Latency of a particular processor at the batch level.{prefix}_condition_latency
Latency of a particular condition.{prefix}_condition_batch_latency
Latency of a particular condition at the batch level.{prefix}_cache_latency
Latency of a particular cache.{prefix}_buffer_latency
Latency of a particular buffer.{prefix}_buffer_batch_latency
Latency of a particular buffer at the batch level.
Metric Labels
All metrics are emitted with the following labels:path
The path of the component within the config.label
A custom label for the component, which is optional and falls back to the component type.
Prometheus
Advanced
Fields
prefix
A string prefix for all metrics. Type:string
Default: "bento"
push_interval
The interval between pushing metrics to the push gateway. Type:string
Default: ""
push_job_name
A job name to attach to metrics pushed to the push gateway. Type:string
Default: "bento_push"
push_url
The URL to push metrics to. Type:string
Default: ""
push_basic_auth
Basic authentication configuration for the push gateway. Type:object
push_basic_auth.enabled
Whether to use basic authentication when pushing metrics. Type:bool
Default: false
push_basic_auth.username
The username to authenticate with. Type:string
Default: ""
push_basic_auth.password
The password to authenticate with. Type:string
Default: ""
file_path
The file path to write metrics to. Type:string
Default: ""
use_histogram_timing
Whether to use histogram metrics for timing values. When set to false, summary metrics are used instead. Type:bool
Default: false
histogram_buckets
A list of duration buckets to track when use_histogram_timing is set to true. Type:array
Default: [0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, 1.0]
Common Configuration
Batching
Tyk Streams is able to join sources and sinks with sometimes conflicting batching behaviours without sacrificing its strong delivery guarantees. Therefore, batching within Tyk Streams is a mechanism that serves multiple purposes:Performance
For most users the only benefit of batching messages is improving throughput over your output protocol. For some protocols this can happen in the background and requires no configuration from you. However, if an output has abatching
configuration block this means it benefits from batching and requires you to specify how you’d like your batches to be formed by configuring a batching policy:
batching
configuration block.
Sometimes you may prefer to create your batches before processing, in which case if your input doesn’t already support a batch policy you can instead use a broker, which also allows you to combine inputs with a single batch policy:
Compatibility
Tyk Streams is able to read and write over protocols that support multiple part messages, and all payloads travelling through Tyk Streams are represented as a multiple part message. Therefore, all components within Tyk Streams are able to work with multiple parts in a message as standard. When messages reach an output that doesn’t support multiple parts the message is broken down into an individual message per part, and then one of two behaviours happen depending on the output. If the output supports batch sending messages then the collection of messages are sent as a single batch. Otherwise, Tyk Streams falls back to sending the messages sequentially in multiple, individual requests. This behaviour means that not only can multiple part message protocols be easily matched with single part protocols, but also the concept of multiple part messages and message batches are interchangeable within Tyk Streams.Batch Policy
When an input or output component has a config fieldbatching
that means it supports a batch policy. This is a mechanism that allows you to configure exactly how your batching should work on messages before they are routed to the input or output it’s associated with. Batches are considered complete and will be flushed downstream when either of the following conditions are met:
- The
byte_size
field is non-zero and the total size of the batch in bytes matches or exceeds it (disregarding metadata.) - The
count
field is non-zero and the total number of messages in the batch matches or exceeds it. - The
period
field is non-empty and the time since the last batch exceeds its value.
A batch policy has the capability to create batches, but not to break them down.
Field Paths
Many components within Tyk Streams allow you to target certain fields using a JSON dot path. The syntax of a path within Tyk Streams is similar to JSON Pointers, except with dot separators instead of slashes (and no leading dot.) When a path is used to set a value any path segment that does not yet exist in the structure is created as an object. For example, if we had the following JSON structure:foo.bar
would return 21
.
The characters ~
(%x7E) and .
(%x2E) have special meaning in Tyk Streams paths. Therefore ~
needs to be encoded as ~0
and .
needs to be encoded as ~1
when these characters appear within a key.
For example, if we had the following JSON structure:
foo~1foo.bar~0bo..baz
would return 22
.
Arrays
When Tyk Streams encounters an array whilst traversing a JSON structure it requires the next path segment to be either an integer of an existing index, or, depending on whether the path is used to query or set the target value, the character*
or -
respectively.
For example, if we had the following JSON structure:
foo.2.bar
would return 23
.
Querying
When a query reaches an array the character*
indicates that the query should return the value of the remaining path from each element of the array (within an array.)
Setting
When an array is reached the character-
indicates that a new element should be appended to the end of the existing elements, if this character is not the final segment of the path then an object is created.
Processing Pipelines
Within a Tyk Streams configuration, in betweeninput
and output
, is a pipeline
section. This section describes an array of processors that are to be applied to all messages, and are not bound to any particular input or output.
If you have processors that are heavy on CPU and aren’t specific to a certain input or output they are best suited for the pipeline section. It is advantageous to use the pipeline section as it allows you to set an explicit number of parallel threads of execution:
threads
is set to -1
(the default) it will automatically match the number of logical CPUs available. By default almost all Tyk Streams sources will utilize as many processing threads as have been configured, which makes horizontal scaling easy.