%% %% Licensed to the Apache Software Foundation (ASF) under one %% or more contributor license agreements. See the NOTICE file %% distributed with this work for additional information %% regarding copyright ownership. The ASF licenses this file %% to you under the Apache License, Version 2.0 (the %% "License"); you may not use this file except in compliance %% with the License. You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, %% software distributed under the License is distributed on an %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. %% -module(thrift_framed_transport). -behaviour(thrift_transport). %% constructor -export([new/1]). %% protocol callbacks -export([read/2, read_exact/2, write/2, flush/1, close/1]). -record(t_framed, { wrapped, read_buffer, write_buffer }). -type state() :: #t_framed{}. -spec new(Transport::thrift_transport:t_transport()) -> thrift_transport:t_transport(). new(Wrapped) -> State = #t_framed{ wrapped = Wrapped, read_buffer = [], write_buffer = [] }, thrift_transport:new(?MODULE, State). -include("thrift_transport_behaviour.hrl"). read(State = #t_framed{wrapped = Wrapped, read_buffer = Buffer}, Len) when is_integer(Len), Len >= 0 -> Binary = iolist_to_binary(Buffer), case Binary of <<>> when Len > 0 -> case next_frame(Wrapped) of {NewState, {ok, Frame}} -> NewBinary = iolist_to_binary([Binary, Frame]), Give = min(iolist_size(NewBinary), Len), {Result, Remaining} = split_binary(NewBinary, Give), {State#t_framed{wrapped = NewState, read_buffer = Remaining}, {ok, Result}}; {NewState, Error} -> {State#t_framed{wrapped = NewState}, Error} end; %% read of zero bytes <<>> -> {State, {ok, <<>>}}; %% read buffer is nonempty _ -> Give = min(iolist_size(Binary), Len), {Result, Remaining} = split_binary(Binary, Give), {State#t_framed{read_buffer = Remaining}, {ok, Result}} end. read_exact(State = #t_framed{wrapped = Wrapped, read_buffer = Buffer}, Len) when is_integer(Len), Len >= 0 -> Binary = iolist_to_binary(Buffer), case iolist_size(Binary) of %% read buffer is larger than requested read size X when X >= Len -> {Result, Remaining} = split_binary(Binary, Len), {State#t_framed{read_buffer = Remaining}, {ok, Result}}; %% read buffer is insufficient for requested read size _ -> case next_frame(Wrapped) of {NewState, {ok, Frame}} -> read_exact( State#t_framed{wrapped = NewState, read_buffer = [Buffer, Frame]}, Len ); {NewState, Error} -> {State#t_framed{wrapped = NewState}, Error} end end. next_frame(Transport) -> case thrift_transport:read_exact(Transport, 4) of {NewState, {ok, <>}} -> thrift_transport:read_exact(NewState, FrameLength); Error -> Error end. write(State = #t_framed{write_buffer = Buffer}, Data) -> {State#t_framed{write_buffer = [Buffer, Data]}, ok}. flush(State = #t_framed{write_buffer = Buffer, wrapped = Wrapped}) -> case iolist_size(Buffer) of %% if write buffer is empty, do nothing 0 -> {State, ok}; FrameLen -> Data = [<>, Buffer], {Written, Response} = thrift_transport:write(Wrapped, Data), {Flushed, ok} = thrift_transport:flush(Written), {State#t_framed{wrapped = Flushed, write_buffer = []}, Response} end. close(State = #t_framed{wrapped = Wrapped}) -> {Closed, Result} = thrift_transport:close(Wrapped), {State#t_framed{wrapped = Closed}, Result}.