
|
Question wrt Producer/consumer pattern.
In a producer/consumer pattern, implemented via DSO-shared-LBQ, one typical problem is endpoint-management.
Consider:
1. Producer puts.
2. Consumer takes (at this point the element is off the shared Queue).
3. Now after take, while processing, if the Consumer dies, then one has lost the element on the Queue.
One can of course come up with a few workarounds. However one fairly easy way to do so is to have the consumer: Peek, Process and then Take. However peek is non-blocking, returning null if the queue is empty. Does anyone know of a LBQ-like implementation with a blocking peek?
Thanks.
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
|

|
Re: Question wrt Producer/consumer pattern.
To clarify - this of course, dosen't apply to all use cases, but only those where consumer processing is idempotent. Else the (o=q.take; o.process) needs to be transactional.
----- Original Message -----
From: "Sreenivasan Iyer" < siyer@...>
To: "tc-users" < tc-users@...>
Sent: 2008年4月15日 11時17分20秒 (GMT+0900) Asia/Tokyo
Subject: [tc-users] Question wrt Producer/consumer pattern.
In a producer/consumer pattern, implemented via DSO-shared-LBQ, one typical problem is endpoint-management.
Consider:
1. Producer puts.
2. Consumer takes (at this point the element is off the shared Queue).
3. Now after take, while processing, if the Consumer dies, then one has lost the element on the Queue.
One can of course come up with a few workarounds. However one fairly easy way to do so is to have the consumer: Peek, Process and then Take. However peek is non-blocking, returning null if the queue is empty. Does anyone know of a LBQ-like implementation with a blocking peek?
Thanks.
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
|

|
Re: Question wrt Producer/consumer pattern.
Would it be an option to subclass LBQ to populate a consumer->element map as a part of the take() (and just before removing from the queue), so that if a consumer dies you know what needs to be requeued? It requires the additional effort of monitoring your consumers and manually requeueing, but since peek-process-take allows only one consumer, it seems you'd need to monitor the consumer anyway to know when to restart it. Perhaps you have other mechanisms for that, though.
I'm just thinking this through as I type, though, so please feel free to point out logical flaws. Matt On Mon, Apr 14, 2008 at 9:49 PM, Sreenivasan Iyer < siyer@...> wrote:
To clarify - this of course, dosen't apply to all use cases, but only those where consumer processing is idempotent. Else the (o=q.take; o.process) needs to be transactional.
----- Original Message -----
From: "Sreenivasan Iyer" < siyer@...>
To: "tc-users" < tc-users@...>
Sent: 2008年4月15日 11時17分20秒 (GMT+0900) Asia/Tokyo
Subject: [tc-users] Question wrt Producer/consumer pattern.
In a producer/consumer pattern, implemented via DSO-shared-LBQ, one typical problem is endpoint-management.
Consider:
1. Producer puts.
2. Consumer takes (at this point the element is off the shared Queue).
3. Now after take, while processing, if the Consumer dies, then one has lost the element on the Queue.
One can of course come up with a few workarounds. However one fairly easy way to do so is to have the consumer: Peek, Process and then Take. However peek is non-blocking, returning null if the queue is empty. Does anyone know of a LBQ-like implementation with a blocking peek?
Thanks.
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
|

|
Re: Question wrt Producer/consumer pattern.
Here's another idea. We can
wrap LBQ along with a cache (ehcache for instance). For every take(),
we put that object into the cache with an expiry time. Clients are
obligated to call queue.commit(Object) to let the queue know that it's
done processing the object. If commit() comes before the object expires
in the cache, all is well. If not, on the event of the expiration
(through listener), we add the object back into the queue.
If client's commit() comes after the object has expired, the return
value of the commit() will be false so that it knows.
I've cooked up a some quick code with Ehcache and it seems doable:
// queue is set to expire taken objects within 2 seconds
LosslessLinkedBlockingQueue queue = new
LosslessLinkedBlockingQueue();
queue.put(new Integer(0));
queue.put(new Integer(1));
Object o = queue.take();
System.out.println("take first item: " + o + " and commit ["
+ queue.commit(System.identityHashCode(o)) + "], queue size = "
+ queue.size());
o = queue.take();
System.out.println("take second item: " + o + ", queue size = " +
queue.size());
System.out.println("Will sleep for 3s ");
Thread.sleep(3 * 1000); // wait more than the expired time of 2s
System.out.println("commit second item [" +
queue.commit(System.identityHashCode(o)) + "], queue size: " +
queue.size());
OUTPUT:
take first item: 0 and commit [true], queue size = 1
take second item: 1, queue size = 0
Will sleep for 3s
putting back uncommited object: 1 ---> log printed out by the queue
event listener
commit second item [false], queue size: 1
-----------
It's kinda crazy but what do we think?
Hung-
matt hoffman wrote:
Would it be an option to subclass LBQ to populate a
consumer->element map as a part of the take() (and just before
removing from the queue), so that if a consumer dies you know what
needs to be requeued? It requires the additional effort of monitoring
your consumers and manually requeueing, but since peek-process-take
allows only one consumer, it seems you'd need to monitor the consumer
anyway to know when to restart it. Perhaps you have other mechanisms
for that, though.
I'm just thinking this through as I type, though, so please feel free
to point out logical flaws.
Matt
On Mon, Apr 14, 2008 at 9:49 PM, Sreenivasan
Iyer < siyer@...>
wrote:
To
clarify - this of course, dosen't apply to all use cases, but only
those where consumer processing is idempotent. Else the (o=q.take;
o.process) needs to be transactional.
----- Original Message -----
From: "Sreenivasan Iyer" < siyer@...>
To: "tc-users" < tc-users@...>
Sent: 2008年4月15日 11時17分20秒 (GMT+0900) Asia/Tokyo
Subject: [tc-users] Question wrt Producer/consumer pattern.
In a producer/consumer pattern, implemented via DSO-shared-LBQ, one
typical problem is endpoint-management.
Consider:
1. Producer puts.
2. Consumer takes (at this point the element is off the shared Queue).
3. Now after take, while processing, if the Consumer dies, then one has
lost the element on the Queue.
One can of course come up with a few workarounds. However one fairly
easy way to do so is to have the consumer: Peek, Process and then Take.
However peek is non-blocking, returning null if the queue is empty.
Does anyone know of a LBQ-like implementation with a blocking peek?
Thanks.
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
|

|
Re: Question wrt Producer/consumer pattern.
The first step is to put the items in a map on put, not on take. If
you do it on take, you inevitably get this:
Object o = take();
// gap right here
cache.put(key, o);
There's a gap where the object is out of the queue, and not in the
cache. It can be done on the opposite side on put, by putting the
object into the map and then into the queue to eliminate the gap.
But there are more challenges ahead :)
Hung Huynh wrote:
Here's another idea. We
can
wrap LBQ along with a cache (ehcache for instance). For every take(),
we put that object into the cache with an expiry time. Clients are
obligated to call queue.commit(Object) to let the queue know that it's
done processing the object. If commit() comes before the object expires
in the cache, all is well. If not, on the event of the expiration
(through listener), we add the object back into the queue.
If client's commit() comes after the object has expired, the return
value of the commit() will be false so that it knows.
I've cooked up a some quick code with Ehcache and it seems doable:
// queue is set to expire taken objects within 2 seconds
LosslessLinkedBlockingQueue queue = new
LosslessLinkedBlockingQueue();
queue.put(new Integer(0));
queue.put(new Integer(1));
Object o = queue.take();
System.out.println("take first item: " + o + " and commit ["
+ queue.commit(System.identityHashCode(o)) + "], queue size = "
+ queue.size());
o = queue.take();
System.out.println("take second item: " + o + ", queue size = " +
queue.size());
System.out.println("Will sleep for 3s ");
Thread.sleep(3 * 1000); // wait more than the expired time of 2s
System.out.println("commit second item [" +
queue.commit(System.identityHashCode(o)) + "], queue size: " +
queue.size());
OUTPUT:
take first item: 0 and commit [true], queue size = 1
take second item: 1, queue size = 0
Will sleep for 3s
putting back uncommited object: 1 ---> log printed out by the queue
event listener
commit second item [false], queue size: 1
-----------
It's kinda crazy but what do we think?
Hung-
matt hoffman wrote:
Would it be an option to subclass LBQ to populate a
consumer->element map as a part of the take() (and just before
removing from the queue), so that if a consumer dies you know what
needs to be requeued? It requires the additional effort of monitoring
your consumers and manually requeueing, but since peek-process-take
allows only one consumer, it seems you'd need to monitor the consumer
anyway to know when to restart it. Perhaps you have other mechanisms
for that, though.
I'm just thinking this through as I type, though, so please feel free
to point out logical flaws.
Matt
On Mon, Apr 14, 2008 at 9:49 PM,
Sreenivasan
Iyer < siyer@...>
wrote:
To
clarify - this of course, dosen't apply to all use cases, but only
those where consumer processing is idempotent. Else the (o=q.take;
o.process) needs to be transactional.
----- Original Message -----
From: "Sreenivasan Iyer" < siyer@...>
To: "tc-users" < tc-users@...>
Sent: 2008年4月15日 11時17分20秒 (GMT+0900) Asia/Tokyo
Subject: [tc-users] Question wrt Producer/consumer pattern.
In a producer/consumer pattern, implemented via DSO-shared-LBQ, one
typical problem is endpoint-management.
Consider:
1. Producer puts.
2. Consumer takes (at this point the element is off the shared Queue).
3. Now after take, while processing, if the Consumer dies, then one has
lost the element on the Queue.
One can of course come up with a few workarounds. However one fairly
easy way to do so is to have the consumer: Peek, Process and then Take.
However peek is non-blocking, returning null if the queue is empty.
Does anyone know of a LBQ-like implementation with a blocking peek?
Thanks.
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
|

|
Re: Question wrt Producer/consumer pattern.
Here's the wrapper take() function
public Object take() throws InterruptedException {
Object o = queue.take();
cache.put(new Element(new Integer(System.identityHashcode(o), o));
return o;
}
It's not synchronized but the original take() is. I can't see how the
gap in between would be a problem?
Hung-
Taylor Gautier wrote:
The first step is to put the items in a map on put, not on take. If
you do it on take, you inevitably get this:
Object o = take();
// gap right here
cache.put(key, o);
There's a gap where the object is out of the queue, and not in the
cache. It can be done on the opposite side on put, by putting the
object into the map and then into the queue to eliminate the gap.
But there are more challenges ahead :)
Hung Huynh wrote:
Here's another idea. We
can
wrap LBQ along with a cache (ehcache for instance). For every take(),
we put that object into the cache with an expiry time. Clients are
obligated to call queue.commit(Object) to let the queue know that it's
done processing the object. If commit() comes before the object expires
in the cache, all is well. If not, on the event of the expiration
(through listener), we add the object back into the queue.
If client's commit() comes after the object has expired, the return
value of the commit() will be false so that it knows.
I've cooked up a some quick code with Ehcache and it seems doable:
// queue is set to expire taken objects within 2 seconds
LosslessLinkedBlockingQueue queue = new
LosslessLinkedBlockingQueue();
queue.put(new Integer(0));
queue.put(new Integer(1));
Object o = queue.take();
System.out.println("take first item: " + o + " and commit ["
+ queue.commit(System.identityHashCode(o)) + "], queue size = "
+ queue.size());
o = queue.take();
System.out.println("take second item: " + o + ", queue size = " +
queue.size());
System.out.println("Will sleep for 3s ");
Thread.sleep(3 * 1000); // wait more than the expired time of 2s
System.out.println("commit second item [" +
queue.commit(System.identityHashCode(o)) + "], queue size: " +
queue.size());
OUTPUT:
take first item: 0 and commit [true], queue size = 1
take second item: 1, queue size = 0
Will sleep for 3s
putting back uncommited object: 1 ---> log printed out by the queue
event listener
commit second item [false], queue size: 1
-----------
It's kinda crazy but what do we think?
Hung-
matt hoffman wrote:
Would it be an option to subclass LBQ to populate a
consumer->element map as a part of the take() (and just before
removing from the queue), so that if a consumer dies you know what
needs to be requeued? It requires the additional effort of monitoring
your consumers and manually requeueing, but since peek-process-take
allows only one consumer, it seems you'd need to monitor the consumer
anyway to know when to restart it. Perhaps you have other mechanisms
for that, though.
I'm just thinking this through as I type, though, so please feel free
to point out logical flaws.
Matt
On Mon, Apr 14, 2008 at 9:49 PM,
Sreenivasan
Iyer < siyer@...>
wrote:
To
clarify - this of course, dosen't apply to all use cases, but only
those where consumer processing is idempotent. Else the (o=q.take;
o.process) needs to be transactional.
----- Original Message -----
From: "Sreenivasan Iyer" < siyer@...>
To: "tc-users" < tc-users@...>
Sent: 2008年4月15日 11時17分20秒 (GMT+0900) Asia/Tokyo
Subject: [tc-users] Question wrt Producer/consumer pattern.
In a producer/consumer pattern, implemented via DSO-shared-LBQ, one
typical problem is endpoint-management.
Consider:
1. Producer puts.
2. Consumer takes (at this point the element is off the shared Queue).
3. Now after take, while processing, if the Consumer dies, then one has
lost the element on the Queue.
One can of course come up with a few workarounds. However one fairly
easy way to do so is to have the consumer: Peek, Process and then Take.
However peek is non-blocking, returning null if the queue is empty.
Does anyone know of a LBQ-like implementation with a blocking peek?
Thanks.
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
|

|
Re: Question wrt Producer/consumer pattern.
If the node dies in between take and put, you lose track of the
object. It's threadsafe, but doesn't do what the original requirement
specified.
Hung Huynh wrote:
Here's the wrapper take() function
public Object take() throws InterruptedException {
Object o = queue.take();
cache.put(new Element(new Integer(System.identityHashcode(o), o));
return o;
}
It's not synchronized but the original take() is. I can't see how the
gap in between would be a problem?
Hung-
Taylor Gautier wrote:
The first step is to put the items in a map on put, not on take. If
you do it on take, you inevitably get this:
Object o = take();
// gap right here
cache.put(key, o);
There's a gap where the object is out of the queue, and not in the
cache. It can be done on the opposite side on put, by putting the
object into the map and then into the queue to eliminate the gap.
But there are more challenges ahead :)
Hung Huynh wrote:
Here's another idea.
We
can
wrap LBQ along with a cache (ehcache for instance). For every take(),
we put that object into the cache with an expiry time. Clients are
obligated to call queue.commit(Object) to let the queue know that it's
done processing the object. If commit() comes before the object expires
in the cache, all is well. If not, on the event of the expiration
(through listener), we add the object back into the queue.
If client's commit() comes after the object has expired, the return
value of the commit() will be false so that it knows.
I've cooked up a some quick code with Ehcache and it seems doable:
// queue is set to expire taken objects within 2 seconds
LosslessLinkedBlockingQueue queue = new
LosslessLinkedBlockingQueue();
queue.put(new Integer(0));
queue.put(new Integer(1));
Object o = queue.take();
System.out.println("take first item: " + o + " and commit ["
+ queue.commit(System.identityHashCode(o)) + "], queue size = "
+ queue.size());
o = queue.take();
System.out.println("take second item: " + o + ", queue size = " +
queue.size());
System.out.println("Will sleep for 3s ");
Thread.sleep(3 * 1000); // wait more than the expired time of 2s
System.out.println("commit second item [" +
queue.commit(System.identityHashCode(o)) + "], queue size: " +
queue.size());
OUTPUT:
take first item: 0 and commit [true], queue size = 1
take second item: 1, queue size = 0
Will sleep for 3s
putting back uncommited object: 1 ---> log printed out by the queue
event listener
commit second item [false], queue size: 1
-----------
It's kinda crazy but what do we think?
Hung-
matt hoffman wrote:
Would it be an option to subclass LBQ to populate a
consumer->element map as a part of the take() (and just before
removing from the queue), so that if a consumer dies you know what
needs to be requeued? It requires the additional effort of monitoring
your consumers and manually requeueing, but since peek-process-take
allows only one consumer, it seems you'd need to monitor the consumer
anyway to know when to restart it. Perhaps you have other mechanisms
for that, though.
I'm just thinking this through as I type, though, so please feel free
to point out logical flaws.
Matt
On Mon, Apr 14, 2008 at 9:49 PM,
Sreenivasan
Iyer < siyer@...>
wrote:
To
clarify - this of course, dosen't apply to all use cases, but only
those where consumer processing is idempotent. Else the (o=q.take;
o.process) needs to be transactional.
----- Original Message -----
From: "Sreenivasan Iyer" < siyer@...>
To: "tc-users" < tc-users@...>
Sent: 2008年4月15日 11時17分20秒 (GMT+0900) Asia/Tokyo
Subject: [tc-users] Question wrt Producer/consumer pattern.
In a producer/consumer pattern, implemented via DSO-shared-LBQ, one
typical problem is endpoint-management.
Consider:
1. Producer puts.
2. Consumer takes (at this point the element is off the shared Queue).
3. Now after take, while processing, if the Consumer dies, then one has
lost the element on the Queue.
One can of course come up with a few workarounds. However one fairly
easy way to do so is to have the consumer: Peek, Process and then Take.
However peek is non-blocking, returning null if the queue is empty.
Does anyone know of a LBQ-like implementation with a blocking peek?
Thanks.
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
_______________________________________________
tc-users mailing list
tc-users@...
http://lists.terracotta.org/mailman/listinfo/tc-users
|